可取消的異步任務(wù)——FutureTask用法及解析 - 簡書
FutureTask的用法。
開發(fā)中我遇到的問題。
結(jié)合FutureTask的源碼分析問題。
1. FutureTask的用法
在Java中,一般是通過繼承Thread類或者實(shí)現(xiàn)Runnable接口來創(chuàng)建多線程,Runnable接口不能返回結(jié)果,如果要獲取子線程的執(zhí)行結(jié)果,一般都是在子線程執(zhí)行結(jié)束之后,通過Handler將結(jié)果返回到調(diào)用線程,jdk1.5之后,Java提供了Callable接口來封裝子任務(wù),Callable接口可以獲取返回結(jié)果。
與FutureTask相關(guān)的類或接口,有Runnable,Callable,F(xiàn)uture,直接從Callable開始。
Callable接口
下面可以看一下Callable接口的定義:
publicinterfaceCallable{/**? ? * Computes a result, or throws an exception if unable to do so.? ? *? ? *@returncomputed result? ? *@throwsException if unable to compute a result? ? */Vcall()throwsException;}
Callable接口很簡單,是一個(gè)泛型接口,就是定義了一個(gè)call()方法,與Runnable的run()方法相比,這個(gè)有返回值,泛型V就是要返回的結(jié)果類型,可以返回子任務(wù)的執(zhí)行結(jié)果。
Future接口
Future接口表示異步計(jì)算的結(jié)果,通過Future接口提供的方法,可以很方便的查詢異步計(jì)算任務(wù)是否執(zhí)行完成,獲取異步計(jì)算的結(jié)果,取消未執(zhí)行的異步任務(wù),或者中斷異步任務(wù)的執(zhí)行,接口定義如下:
publicinterfaceFuture{booleancancel(booleanmayInterruptIfRunning);booleanisCancelled();booleanisDone();Vget()throwsInterruptedException, ExecutionException;Vget(longtimeout, TimeUnit unit)throwsInterruptedException, ExecutionException, TimeoutException;}
cancel(boolean mayInterruptIfRunning):取消子任務(wù)的執(zhí)行,如果這個(gè)子任務(wù)已經(jīng)執(zhí)行結(jié)束,或者已經(jīng)被取消,或者不能被取消,這個(gè)方法就會執(zhí)行失敗并返回false;如果子任務(wù)還沒有開始執(zhí)行,那么子任務(wù)會被取消,不會再被執(zhí)行;如果子任務(wù)已經(jīng)開始執(zhí)行了,但是還沒有執(zhí)行結(jié)束,根據(jù)mayInterruptIfRunning的值,如果mayInterruptIfRunning = true,那么會中斷執(zhí)行任務(wù)的線程,然后返回true,如果參數(shù)為false,會返回true,不會中斷執(zhí)行任務(wù)的線程。這個(gè)方法在FutureTask的實(shí)現(xiàn)中有很多值得關(guān)注的地方,后面再細(xì)說。
需要注意,這個(gè)方法執(zhí)行結(jié)束,返回結(jié)果之后,再調(diào)用isDone()會返回true。
isCancelled(),判斷任務(wù)是否被取消,如果任務(wù)執(zhí)行結(jié)束(正常執(zhí)行結(jié)束和發(fā)生異常結(jié)束,都算執(zhí)行結(jié)束)前被取消,也就是調(diào)用了cancel()方法,并且cancel()返回true,則該方法返回true,否則返回false.
isDone():判斷任務(wù)是否執(zhí)行結(jié)束,正常執(zhí)行結(jié)束,或者發(fā)生異常結(jié)束,或者被取消,都屬于結(jié)束,該方法都會返回true.
V get():獲取結(jié)果,如果這個(gè)計(jì)算任務(wù)還沒有執(zhí)行結(jié)束,該調(diào)用線程會進(jìn)入阻塞狀態(tài)。如果計(jì)算任務(wù)已經(jīng)被取消,調(diào)用get()會拋出CancellationException,如果計(jì)算過程中拋出異常,該方法會拋出ExecutionException,如果當(dāng)前線程在阻塞等待的時(shí)候被中斷了,該方法會拋出InterruptedException。
V get(long timeout, TimeUnit unit):帶超時(shí)限制的get(),等待超時(shí)之后,該方法會拋出TimeoutException。
FutureTask
FutureTask可以像Runnable一下,封裝異步任務(wù),然后提交給Thread或線程池執(zhí)行,然后獲取任務(wù)執(zhí)行結(jié)果。原因在于FutureTask實(shí)現(xiàn)了RunnableFuture接口,RunnableFuture是什么呢,其實(shí)就是Runnable和Callable的結(jié)合,它繼承自Runnable和Callable。繼承關(guān)系如下:
publicclassFutureTaskimplementsRunnableFuture{publicinterfaceRunnableFutureextendsRunnable,Future{
FutureTask使用
FutureTask + Thread
上面介紹過,F(xiàn)utureTask有Runnable接口和Callable接口的特征,可以被Thread執(zhí)行。
//step1:封裝一個(gè)計(jì)算任務(wù),實(shí)現(xiàn)Callable接口? classTaskimplementsCallable{@OverridepublicBooleancall()throwsException{try{for(inti =0; i <10; i++) {? ? ? ? ? ? ? ? Log.d(TAG,"task......."+ Thread.currentThread().getName() +"...i = "+ i);//模擬耗時(shí)操作Thread.sleep(100);? ? ? ? ? ? }? ? ? ? }catch(InterruptedException e) {? ? ? ? ? ? Log.e(TAG," is interrupted when calculating, will stop...");returnfalse;// 注意這里如果不return的話,線程還會繼續(xù)執(zhí)行,所以任務(wù)超時(shí)后在這里處理結(jié)果然后返回}returntrue;? ? }}//step2:創(chuàng)建計(jì)算任務(wù),作為參數(shù),傳入FutureTaskTask task =newTask();FutureTask futureTask =newFutureTask(task);//step3:將FutureTask提交給Thread執(zhí)行Thread thread1 =newThread(futureTask);thread1.setName("task thread 1");thread1.start();//step4:獲取執(zhí)行結(jié)果,由于get()方法可能會阻塞當(dāng)前調(diào)用線程,如果子任務(wù)執(zhí)行時(shí)間不確定,最好在子線程中獲取執(zhí)行結(jié)果try{// boolean result = (boolean) futureTask.get();booleanresult = (boolean) futureTask.get(5, TimeUnit.SECONDS);? ? Log.d(TAG,"result:"+ result);}catch(InterruptedException e) {? ? Log.e(TAG,"守護(hù)線程阻塞被打斷...");? ? e.printStackTrace();}catch(ExecutionException e) {? ? Log.e(TAG,"執(zhí)行任務(wù)時(shí)出錯...");? ? e.printStackTrace();}catch(TimeoutException e) {? ? Log.e(TAG,"執(zhí)行超時(shí)...");? ? futureTask.cancel(true);? ? e.printStackTrace();}catch(CancellationException e) {//如果線程已經(jīng)cancel了,再執(zhí)行g(shù)et操作會拋出這個(gè)異常Log.e(TAG,"future已經(jīng)cancel了...");? ? e.printStackTrace();}
Future + ExecutorService
//step1 ......//step2:創(chuàng)建計(jì)算任務(wù)Task task =newTask();//step3:創(chuàng)建線程池,將Callable類型的task提交給線程池執(zhí)行,通過Future獲取子任務(wù)的執(zhí)行結(jié)果ExecutorService executorService = Executors.newCachedThreadPool();finalFuture future = executorService.submit(task);//step4:通過future獲取執(zhí)行結(jié)果booleanresult = (boolean) future.get();
FutureTask + ExecutorService
//step1 ......//step2 ......//step3:將FutureTask提交給線程池執(zhí)行ExecutorService executorService = Executors.newCachedThreadPool();executorService.execute(futureTask);//step4 ......
2. 開發(fā)中我遇到的問題。
FutureTask使用還是比較簡單的,F(xiàn)utureTask與Runnable,最大的區(qū)別有兩個(gè),一個(gè)是可以獲取執(zhí)行結(jié)果,另一個(gè)是可以取消,使用方法可以參考以上步驟,不過我在項(xiàng)目中使用FutureTask出現(xiàn)了以下兩個(gè)問題:
有的情況下,使用futuretask.cancel(true)方法并不能真正的結(jié)束子任務(wù)執(zhí)行。
FutureTask的get(long timeout, TimeUnit unit)方法,是等待timeout時(shí)間后,獲取子線程的執(zhí)行結(jié)果,但是如果子任務(wù)執(zhí)行結(jié)束了,但是超時(shí)時(shí)間還沒有到,這個(gè)方法也會返回結(jié)果。
3. 結(jié)合FutureTask的源碼分析問題。
成員變量
下面,結(jié)合FutureTask的源碼,分析一下以上兩個(gè)問題。在此之前,先看一下FutureTask內(nèi)部比較值得關(guān)注的幾個(gè)成員變量。
private volatile int state,state用來標(biāo)識當(dāng)前任務(wù)的運(yùn)行狀態(tài)。FutureTask的所有方法都是圍繞這個(gè)狀態(tài)進(jìn)行的,需要注意,這個(gè)值用volatile(易變的)來標(biāo)記,如果有多個(gè)子線程在執(zhí)行FutureTask,那么它們看到的都會是同一個(gè)state,有如下幾個(gè)值:
privatevolatileintstate;privatestaticfinalintNEW? ? ? ? ? =0;privatestaticfinalintCOMPLETING? =1;privatestaticfinalintNORMAL? ? ? =2;privatestaticfinalintEXCEPTIONAL? =3;privatestaticfinalintCANCELLED? ? =4;privatestaticfinalintINTERRUPTING =5;privatestaticfinalintINTERRUPTED? =6;
NEW:表示這是一個(gè)新的任務(wù),或者還沒有執(zhí)行完的任務(wù),是初始狀態(tài)。
COMPLETING:表示任務(wù)執(zhí)行結(jié)束(正常執(zhí)行結(jié)束,或者發(fā)生異常結(jié)束),但是還沒有將結(jié)果保存到outcome中。是一個(gè)中間狀態(tài)。
NORMAL:表示任務(wù)正常執(zhí)行結(jié)束,并且已經(jīng)把執(zhí)行結(jié)果保存到outcome字段中。是一個(gè)最終狀態(tài)。
EXCEPTIONAL:表示任務(wù)發(fā)生異常結(jié)束,異常信息已經(jīng)保存到outcome中,這是一個(gè)最終狀態(tài)。
CANCELLED:任務(wù)在新建之后,執(zhí)行結(jié)束之前被取消了,但是不要求中斷正在執(zhí)行的線程,也就是調(diào)用了cancel(false),任務(wù)就是CANCELLED狀態(tài),這時(shí)任務(wù)狀態(tài)變化是NEW -> CANCELLED。
INTERRUPTING:任務(wù)在新建之后,執(zhí)行結(jié)束之前被取消了,并要求中斷線程的執(zhí)行,也就是調(diào)用了cancel(true),這時(shí)任務(wù)狀態(tài)就是INTERRUPTING。這是一個(gè)中間狀態(tài)。
INTERRUPTED:調(diào)用cancel(true)取消異步任務(wù),會調(diào)用interrupt()中斷線程的執(zhí)行,然后狀態(tài)會從INTERRUPTING變到INTERRUPTED。
狀態(tài)變化有如下4種情況:
NEW -> COMPLETING -> NORMAL? ---------------------------------------? 正常執(zhí)行結(jié)束的流程
NEW -> COMPLETING -> EXCEPTIONAL? ---------------------執(zhí)行過程中出現(xiàn)異常的流程
NEW -> CANCELLED -------------------------------------------被取消,即調(diào)用了cancel(false)
NEW -> INTERRUPTING -> INTERRUPTED? -------------被中斷,即調(diào)用了cancel(true)
private Callable<V> callable,一個(gè)Callable類型的變量,封裝了計(jì)算任務(wù),可獲取計(jì)算結(jié)果。從上面的用法中可以看到,F(xiàn)utureTask的構(gòu)造函數(shù)中,我們傳入的就是實(shí)現(xiàn)了Callable的接口的計(jì)算任務(wù)。
private Object outcome,Object類型的變量outcome,用來保存計(jì)算任務(wù)的返回結(jié)果,或者執(zhí)行過程中拋出的異常。
private volatile Thread runner,指向當(dāng)前在運(yùn)行Callable任務(wù)的線程,runner在FutureTask中的賦值變化很值得關(guān)注,后面源碼會詳細(xì)介紹這個(gè)。
private volatile WaitNode waiters,WaitNode是FutureTask的內(nèi)部類,表示一個(gè)阻塞隊(duì)列,如果任務(wù)還沒有執(zhí)行結(jié)束,那么調(diào)用get()獲取結(jié)果的線程會阻塞,在這個(gè)阻塞隊(duì)列中排隊(duì)等待。
成員函數(shù)
下面從構(gòu)造函數(shù)說起,看一下FutureTask的源碼。
1. 構(gòu)造函數(shù)
publicFutureTask(Callable<V> callable){if(callable ==null)thrownewNullPointerException();this.callable = callable;this.state = NEW;// ensure visibility of callable}
FutureTask的第一個(gè)構(gòu)造函數(shù),參數(shù)是Callable類型的變量。將傳入的參數(shù)賦值給this.callable,然后設(shè)置state狀態(tài)為NEW,表示這是新任務(wù)。
publicFutureTask(Runnable runnable, V result){this.callable = Executors.callable(runnable, result);this.state = NEW;// ensure visibility of callable}
FutureTask還有一個(gè)構(gòu)造函數(shù),接收Runnable類型的參數(shù),通過Executors.callable(runnable, result)將傳入的Runnable和result轉(zhuǎn)換成Callable類型。使用該構(gòu)造方法,可以定制返回結(jié)果。
publicstaticCallablecallable(Runnable task, T result){if(task ==null)thrownewNullPointerException();returnnewRunnableAdapter(task, result);}
可以看一下Executors.callable(runnable, result)方法,這里通過適配器模式進(jìn)行適配,創(chuàng)建一個(gè)RunnableAdapter適配器。
privatestaticfinalclassRunnableAdapterimplementsCallable{privatefinalRunnable task;privatefinalT result;? ? RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;? ? }publicTcall(){? ? ? ? task.run();returnresult;? ? }}
RunnableAdapter是Executors的內(nèi)部類,實(shí)現(xiàn)也比較簡單,實(shí)現(xiàn)了適配對象Callable接口,在call()方法中執(zhí)行Runnable的run(),然后返回result。
2. 任務(wù)被執(zhí)行——run()
FutureTask封裝了計(jì)算任務(wù),無論是提交給Thread執(zhí)行,或者線程池執(zhí)行,調(diào)用的都是FutureTask的run()。
publicvoidrun(){//1.判斷狀態(tài)是否是NEW,不是NEW,說明任務(wù)已經(jīng)被其他線程執(zhí)行,甚至執(zhí)行結(jié)束,或者被取消了,直接返回//2.調(diào)用CAS方法,判斷RUNNER為null的話,就將當(dāng)前線程保存到RUNNER中,設(shè)置RUNNER失敗,就直接返回if(state != NEW ||? ? ? ? ? ? !U.compareAndSwapObject(this, RUNNER,null, Thread.currentThread()))return;try{? ? ? ? Callable c = callable;if(c !=null&& state == NEW) {? ? ? ? ? ? V result;booleanran;try{//3.執(zhí)行Callable任務(wù),結(jié)果保存到result中result = c.call();? ? ? ? ? ? ? ? ran =true;? ? ? ? ? ? }catch(Throwable ex) {//3.1 如果執(zhí)行任務(wù)過程中發(fā)生異常,將調(diào)用setException()設(shè)置異常result =null;? ? ? ? ? ? ? ? ran =false;? ? ? ? ? ? ? ? setException(ex);? ? ? ? ? ? }//3.2 任務(wù)正常執(zhí)行結(jié)束調(diào)用set(result)保存結(jié)果if(ran)? ? ? ? ? ? ? ? set(result);? ? ? ? }? ? }finally{// runner must be non-null until state is settled to// prevent concurrent calls to run()//4. 任務(wù)執(zhí)行結(jié)束,runner設(shè)置為null,表示當(dāng)前沒有線程在執(zhí)行這個(gè)任務(wù)了runner =null;// state must be re-read after nulling runner to prevent// leaked interrupts//5. 讀取狀態(tài),判斷是否在執(zhí)行的過程中,被中斷了,如果被中斷,處理中斷ints = state;if(s >= INTERRUPTING)? ? ? ? ? ? handlePossibleCancellationInterrupt(s);? ? }}
首先,判斷state的值是不是NEW,如果不是NEW,說明線程已經(jīng)被執(zhí)行了,可能已經(jīng)執(zhí)行結(jié)束,或者被取消了,直接返回。
這里其實(shí)是調(diào)用了Unsafe的CAS方法,讀取并設(shè)置runner的值,將當(dāng)前線程保存到runner中,表示當(dāng)前正在執(zhí)行任務(wù)的線程??梢钥吹剑@里設(shè)置的其實(shí)是RUNNER,和前面介紹的Thread類型的runner變量不一樣的,那為什么還說設(shè)置的是runner的值?RUNNER在FutureTask中定義如下:
privatestaticfinallongRUNNER;//RUNNER是一個(gè)long類型的變量,指向runner字段的偏移地址,相當(dāng)于指針RUNNER = U.objectFieldOffset? ? ? ? (FutureTask.class.getDeclaredField("runner"));
關(guān)于Unsafe的CAS方法,簡單介紹一下,它提供了一種對runner進(jìn)行原子操作的方法,原子操作,意味著,這個(gè)操作不會被打斷。runner被volatile字段修飾,只能保證,當(dāng)多個(gè)子線程在執(zhí)行FutureTask的時(shí)候,它們讀取到的runner的值是同一個(gè),但是不能保證原子操作,所以很容易讀到臟數(shù)據(jù)(舉個(gè)例子:線程A準(zhǔn)備對runner進(jìn)行讀和寫操作,讀取到runner的值為null,這是,cpu切換執(zhí)行線程B,線程B讀取到runner的值也是null,然后又切換到線程A執(zhí)行,線程A對runner賦值thread-A,此時(shí)runner的值已經(jīng)不再是null,線程B讀取到的runner=null就是臟數(shù)據(jù)),用Unsafe的CAS方法,來對runner進(jìn)行讀寫,就能保證原子操作。多個(gè)線程訪問run()方法時(shí),會在這里同步。
讀取callable變量,執(zhí)行call(),并獲取執(zhí)行結(jié)果。
如果執(zhí)行call()的過程中發(fā)生異常,就調(diào)用setException()設(shè)置異常,setException()定義如下:
protectedvoidsetException(Throwable t){if(U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {? ? ? ? outcome = t;? ? ? ? U.putOrderedInt(this, STATE, EXCEPTIONAL);// final statefinishCompletion();? ? }}//a. 調(diào)用Unsafe的CAS方法,state從NEW --> COMPLETING,這里的STATE和上面的RUNNER定義類似,指向state字段的偏移地址。//b. 將異常信息保存到outcome字段,state變成EXCEPTIONAL。//c. 調(diào)用finishCompletion()。//NEW --> COMPLETING --> EXCEPTIONAL。
如果任務(wù)正常執(zhí)行結(jié)束,就調(diào)用set(result)保存結(jié)果,定義如下:
protectedvoidset(V v){if(U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {? ? ? ? outcome = v;? ? ? ? U.putOrderedInt(this, STATE, NORMAL);// final statefinishCompletion();? ? }}//a. 和setException()類似,state從NEW --> COMPLETING。//b. 將正常執(zhí)行的結(jié)果result保存到outcome,state變成NORMAL。//c. 調(diào)用finishCompletion()。NEW --> COMPLETING --> NORMAL。
任務(wù)執(zhí)行結(jié)束,runner設(shè)置為null,表示當(dāng)前沒有線程在執(zhí)行這個(gè)任務(wù)了。
讀取state狀態(tài),判斷是否在執(zhí)行的過程中被中斷了,如果被中斷,處理中斷,看一下這個(gè)中斷處理:
privatevoidhandlePossibleCancellationInterrupt(ints){// It is possible for our interrupter to stall before getting a// chance to interrupt us.? Let's spin-wait patiently.if(s == INTERRUPTING)while(state == INTERRUPTING)? ? ? ? ? ? Thread.yield();// wait out pending interrupt}
如果狀態(tài)是INTERRUPTING,表示正在被中斷,這時(shí)就讓出線程的執(zhí)行權(quán),給其他線程來執(zhí)行。
3. 獲取任務(wù)的執(zhí)行結(jié)果——get()
一般情況下,執(zhí)行任務(wù)的線程和獲取結(jié)果的線程不會是同一個(gè),當(dāng)我們在主線程或者其他線程中,獲取計(jì)算任務(wù)的結(jié)果時(shí),就會調(diào)用get方法,如果這時(shí)計(jì)算任務(wù)還沒有執(zhí)行完成,調(diào)用get()的線程就會阻塞等待。get()實(shí)現(xiàn)如下:
publicVget()throwsInterruptedException, ExecutionException{ints = state;if(s <= COMPLETING)? ? ? ? s = awaitDone(false,0L);returnreport(s);}
讀取任務(wù)的執(zhí)行狀態(tài) state ,如果state <= COMPLETING,說明線程還沒有執(zhí)行完(run()中可以看到,只有任務(wù)執(zhí)行結(jié)束,或者發(fā)生異常的時(shí)候,state才會被設(shè)置成COMPLETING)。
調(diào)用awaitDone(false, 0L),進(jìn)入阻塞狀態(tài)。看一下awaitDone(false, 0L)的實(shí)現(xiàn):
privateintawaitDone(booleantimed,longnanos)throwsInterruptedException{longstartTime =0L;// Special value 0L means not yet parkedWaitNode q =null;booleanqueued =false;for(;;) {//1. 讀取狀態(tài)//1.1 如果s > COMPLETING,表示任務(wù)已經(jīng)執(zhí)行結(jié)束,或者發(fā)生異常結(jié)束了,就不會阻塞,直接返回ints = state;if(s > COMPLETING) {if(q !=null)? ? ? ? ? ? ? ? q.thread =null;returns;? ? ? ? }//1.2 如果s == COMPLETING,表示任務(wù)結(jié)束(正常/異常),但是結(jié)果還沒有保存到outcome字段,當(dāng)前線程讓出執(zhí)行權(quán),給其他線程先執(zhí)行elseif(s == COMPLETING)// We may have already promised (via isDone) that we are done// so never return empty-handed or throw InterruptedExceptionThread.yield();//2. 如果調(diào)用get()的線程被中斷了,就從等待的線程棧中移除這個(gè)等待節(jié)點(diǎn),然后拋出中斷異常elseif(Thread.interrupted()) {? ? ? ? ? ? removeWaiter(q);thrownewInterruptedException();? ? ? ? }//3. 如果等待節(jié)點(diǎn)q=null,就創(chuàng)建一個(gè)等待節(jié)點(diǎn)elseif(q ==null) {if(timed && nanos <=0L)returns;? ? ? ? ? ? q =newWaitNode();? ? ? ? }//4. 如果這個(gè)等待節(jié)點(diǎn)還沒有加入等待隊(duì)列,就加入隊(duì)列頭elseif(!queued)? ? ? ? ? ? queued = U.compareAndSwapObject(this, WAITERS,? ? ? ? ? ? ? ? ? ? q.next = waiters, q);//5. 如果設(shè)置了超時(shí)等待時(shí)間elseif(timed) {//5.1 設(shè)置startTime,用于計(jì)算超時(shí)時(shí)間,如果超時(shí)時(shí)間到了,就等待隊(duì)列中移除當(dāng)前節(jié)點(diǎn)finallongparkNanos;if(startTime ==0L) {// first timestartTime = System.nanoTime();if(startTime ==0L)? ? ? ? ? ? ? ? ? ? startTime =1L;? ? ? ? ? ? ? ? parkNanos = nanos;? ? ? ? ? ? }else{longelapsed = System.nanoTime() - startTime;if(elapsed >= nanos) {? ? ? ? ? ? ? ? ? ? removeWaiter(q);returnstate;? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? parkNanos = nanos - elapsed;? ? ? ? ? ? }// nanoTime may be slow; recheck before parking//5.2 如果超時(shí)時(shí)間還沒有到,而且任務(wù)還沒有結(jié)束,就阻塞特定時(shí)間if(state < COMPLETING)? ? ? ? ? ? ? ? LockSupport.parkNanos(this, parkNanos);? ? ? ? }//6. 阻塞,等待喚醒elseLockSupport.park(this);? ? }}
這里主要有幾個(gè)步驟:
a. 讀取state,如果s > COMPLETING,表示任務(wù)已經(jīng)執(zhí)行結(jié)束,或者發(fā)生異常結(jié)束了,此時(shí),調(diào)用get()的線程就不會阻塞;如果s == COMPLETING,表示任務(wù)結(jié)束(正常/異常),但是結(jié)果還沒有保存到outcome字段,當(dāng)前線程讓出執(zhí)行權(quán),給其他線程先執(zhí)行。
b. 判斷Thread.interrupted(),如果調(diào)用get()的線程被中斷了,就從等待的線程棧(其實(shí)就是一個(gè)WaitNode節(jié)點(diǎn)隊(duì)列或者說是棧)中移除這個(gè)等待節(jié)點(diǎn),然后拋出中斷異常。
c. 判斷q == null,如果等待節(jié)點(diǎn)q為null,就創(chuàng)建等待節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)后面會被插入阻塞隊(duì)列。
d. 判斷queued,這里是將c中創(chuàng)建節(jié)點(diǎn)q加入隊(duì)列頭。使用Unsafe的CAS方法,對waiters進(jìn)行賦值,waiters也是一個(gè)WaitNode節(jié)點(diǎn),相當(dāng)于隊(duì)列頭,或者理解為隊(duì)列的頭指針。通過WaitNode可以遍歷整個(gè)阻塞隊(duì)列。
e. 之后,判斷timed,這是從get()傳入的值,表示是否設(shè)置了超時(shí)時(shí)間。設(shè)置超時(shí)時(shí)間之后,調(diào)用get()的線程最多阻塞nanos,就會從阻塞狀態(tài)醒過來。如果沒有設(shè)置超時(shí)時(shí)間,就直接進(jìn)入阻塞狀態(tài),等待被其他線程喚醒。
awaitDone()方法內(nèi)部有一個(gè)無限循環(huán),看似有很多判斷,比較難理解,其實(shí)這個(gè)循環(huán)最多循環(huán)3次。
假設(shè)Thread A執(zhí)行了get()獲取計(jì)算任務(wù)執(zhí)行結(jié)果,但是子任務(wù)還沒有執(zhí)行完,而且Thread A沒有被中斷,它會進(jìn)行以下步驟。
step1:Thread A執(zhí)行了awaitDone(),1,2兩次判斷都不成立,Thread A判斷q=null,會創(chuàng)建一個(gè)WaitNode節(jié)點(diǎn)q,然后進(jìn)入第二次循環(huán)。
step2:第二次循環(huán),判斷4不成立,此時(shí)將step1創(chuàng)建的節(jié)點(diǎn)q加入隊(duì)列頭。
step3:第三次循環(huán),判斷是否設(shè)置了超時(shí)時(shí)間,如果設(shè)置了超時(shí)時(shí)間,就阻塞特定時(shí)間,否則,一直阻塞,等待被其他線程喚醒。
從awaitDone()返回,最后調(diào)用report(int s),這個(gè)后面再介紹。
4. 取消任務(wù)——cancel(boolean mayInterruptIfRunning)
通常調(diào)用cancel()的線程和執(zhí)行子任務(wù)的線程不會是同一個(gè)。當(dāng)FutureTask的cancel(boolean mayInterruptIfRunning)方法被調(diào)用時(shí),如果子任務(wù)還沒有執(zhí)行,那么這個(gè)任務(wù)就不會執(zhí)行了,如果子任務(wù)已經(jīng)執(zhí)行,且mayInterruptIfRunning=true,那么執(zhí)行子任務(wù)的線程會被中斷(注意:這里說的是線程被中斷,不是任務(wù)被取消),下面看一下這個(gè)方法的實(shí)現(xiàn):
publicbooleancancel(booleanmayInterruptIfRunning){//1.判斷state是否為NEW,如果不是NEW,說明任務(wù)已經(jīng)結(jié)束或者被取消了,該方法會執(zhí)行返回false//state=NEW時(shí),判斷mayInterruptIfRunning,如果mayInterruptIfRunning=true,說明要中斷任務(wù)的執(zhí)行,NEW->INTERRUPTING//如果mayInterruptIfRunning=false,不需要中斷,狀態(tài)改為CANCELLEDif(!(state == NEW &&? ? ? ? ? ? U.compareAndSwapInt(this, STATE, NEW,? ? ? ? ? ? ? ? ? ? mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))returnfalse;try{// in case call to interrupt throws exceptionif(mayInterruptIfRunning) {try{//2.讀取當(dāng)前正在執(zhí)行子任務(wù)的線程runner,調(diào)用t.interrupt(),中斷線程執(zhí)行Thread t = runner;if(t !=null)? ? ? ? ? ? ? ? ? ? t.interrupt();? ? ? ? ? ? }finally{// final state//3.修改狀態(tài)為INTERRUPTEDU.putOrderedInt(this, STATE, INTERRUPTED);? ? ? ? ? ? }? ? ? ? }? ? }finally{? ? ? ? finishCompletion();? ? }returntrue;}
cancel()分析:
判斷state,保證state = NEW才能繼續(xù)cancel()的后續(xù)操作。state=NEW且mayInterruptIfRunning=true,說明要中斷任務(wù)的執(zhí)行,此時(shí),NEW->INTERRUPTING。然后讀取當(dāng)前執(zhí)行任務(wù)的線程runner,調(diào)用t.interrupt(),中斷線程執(zhí)行,NEW->INTERRUPTING->INTERRUPTED,最后調(diào)用finishCompletion()。
如果NEW->INTERRUPTING,那么cancel()方法,只是修改了狀態(tài),NEW->CANCELLED,然后直接調(diào)用finishCompletion()。
所以cancel(true)方法,只是調(diào)用t.interrupt(),此時(shí),如果t因?yàn)閟leep(),wait()等方法進(jìn)入阻塞狀態(tài),那么阻塞的地方會拋出InterruptedException;如果線程正常運(yùn)行,需要結(jié)合Thread的interrupted()方法進(jìn)行判斷,才能結(jié)束,否則,cancel(true)不能結(jié)束正在執(zhí)行的任務(wù)。
這也就可以解釋前面我遇到的問題,有的情況下,使用 futuretask.cancel(true)方法并不能真正的結(jié)束子任務(wù)執(zhí)行。
5. 子線程返回結(jié)果前的最后一步——finishCompletion()
前面多次出現(xiàn)過這個(gè)方法,set(V v)(保存執(zhí)行結(jié)果,設(shè)置狀態(tài)為NORMAL),setException(Throwable t)(保存結(jié)果,設(shè)置狀態(tài)為EXCEPTIONAL)和cancel(boolean mayInterruptIfRunning)(設(shè)置狀態(tài)為CANCELLED/INTERRUPTED),該方法在state變成最終態(tài)之后,會被調(diào)用。
privatevoidfinishCompletion(){// assert state > COMPLETING;for(WaitNode q; (q = waiters) !=null;) {if(U.compareAndSwapObject(this, WAITERS, q,null)) {for(;;) {? ? ? ? ? ? ? ? Thread t = q.thread;if(t !=null) {? ? ? ? ? ? ? ? ? ? q.thread =null;? ? ? ? ? ? ? ? ? ? LockSupport.unpark(t);? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? WaitNode next = q.next;if(next ==null)break;? ? ? ? ? ? ? ? q.next =null;// unlink to help gcq = next;? ? ? ? ? ? }break;? ? ? ? }? ? }? ? done();? ? callable =null;// to reduce footprint}
finishCompletion()主要做了三件事情:
遍歷waiters等待隊(duì)列,調(diào)用LockSupport.unpark(t)喚醒等待返回結(jié)果的線程,釋放資源。
調(diào)用done(),這個(gè)方法什么都沒有做,不過子類可以實(shí)現(xiàn)這個(gè)方法,做一些額外的操作。
設(shè)置callable為null,callable是FutureTask封裝的任務(wù),任務(wù)執(zhí)行完,釋放資源。
這里可以解答上面的第二個(gè)問題了。FutureTask的get(long timeout, TimeUnit unit)方法,表示阻塞timeout時(shí)間后,獲取子線程的執(zhí)行結(jié)果,但是如果子任務(wù)執(zhí)行結(jié)束了,但是超時(shí)時(shí)間還沒有到,這個(gè)方法也會返回結(jié)果。因?yàn)槿蝿?wù)執(zhí)行完之后,會遍歷阻塞隊(duì)列,喚醒阻塞的線程。LockSupport.unpark(t)執(zhí)行之后,阻塞的線程會從LockSupport.park(this)/LockSupport.parkNanos(this, parkNanos)醒來,然后會繼續(xù)進(jìn)入awaitDone(boolean timed, long nanos)的while循環(huán),此時(shí),state >= COMPLETING,然后從awaitDone()返回。此時(shí),get()/get(long timeout, TimeUnit unit)會繼續(xù)執(zhí)行,return report(s),上面介紹get()的時(shí)候沒介紹的方法??匆幌聄eport(int s):
privateVreport(ints)throwsExecutionException{? ? Object x = outcome;if(s == NORMAL)return(V)x;if(s >= CANCELLED)thrownewCancellationException();thrownewExecutionException((Throwable)x);}
其實(shí)就是讀取outcome,將state映射到最后返回的結(jié)果中,s == NORMAL說明任務(wù)正常結(jié)束,返回正常結(jié)果,s >= CANCELLED,就拋出CancellationException。
6.其他方法
FutureTask的還有兩個(gè)方法isCancelled()和isDone(),其實(shí)就是判斷state,沒有過多的步驟。
publicbooleanisCancelled(){returnstate >= CANCELLED;}publicbooleanisDone(){returnstate != NEW;}
總結(jié)
到此FutureTask分析完畢,其中感受最深的是Unsafe的用法,對于多線程共享的對象,采用volatile+Unsafe的方法,代替鎖操作,進(jìn)行同步;其次,是LockSupport的park(Object blocker)和unpark(Thread thread)的使用
park(Object blocker):線程進(jìn)入阻塞狀態(tài),告訴線程調(diào)度,當(dāng)前線程不可用,直到線程再次獲取permit(允許);如果在調(diào)用park(Object blocker)之前,線程已經(jīng)獲得了permit(比如說,已經(jīng)調(diào)用了unpark(t)),那么該方法會返回。
unpark(Thread thread):使得傳入的線程再次獲得permit.這里的permit可以理解為一個(gè)信號量。
LockSupport在這里的作用,類似于wait(),notify()/notifyAll(),關(guān)于二者的區(qū)別,可以看一下
Java的LockSupport.park()實(shí)現(xiàn)分析。
作者:zero_sr
鏈接:http://m.itdecent.cn/p/55221d045f39
來源:簡書
簡書著作權(quán)歸作者所有,任何形式的轉(zhuǎn)載都請聯(lián)系作者獲得授權(quán)并注明出處。