AsyncTask 是一個安卓開發(fā)中很常用的類,畢竟有太多地方需要做異步處理,而 AsyncTask 做了很好的封裝。正好最近在看 Java 并發(fā),看了下 AsyncTask的源碼,里面也涉及到了這部分的內(nèi)容,正好一起學(xué)習(xí)一下。
本文先從 AsyncTask 中涉及到的并發(fā)的基礎(chǔ)部分講起,之后再分析 AsyncTask 源碼,這樣看起來會清晰很多。本文不會介紹AsyncTask的基本用法,只講底層實現(xiàn)。會包括以下內(nèi)容:
- Executor
- Callable、Future 和 FutureTask
- AsyncTask 源碼分析
注:下面源碼中出現(xiàn)的 Params, Progress, Result,Data都是泛型參數(shù),注意一下,以免引起理解困難
1. Excutor
開啟一個子線程最簡單也最常見的方法就是用 Thread 去驅(qū)動一個 Runnable 對象。這種方法大家都很熟悉了這里就不再舉例。但其實 Java 類庫中還提供了一種更加強大的開啟多線程任務(wù)的方法,那就是 Excutor。從這個名字就可以直觀的理解,它是一個用來執(zhí)行任務(wù)的工具。
public interface Executor {
void execute(Runnable command);
}
Executor 是一個只有一個方法的接口。一般使用的是 ExecutorService ,是一個繼承了 Executor 的接口,文檔中對 ExecutorService 的概述是這樣的:
An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.
簡單的說就是一個帶有生命周期管理的 Executor,并且可以追蹤其執(zhí)行的任務(wù)的狀態(tài)。這里涉及到 Future 和 Callable 兩個接口,稍后會介紹。
說了這么半天,那么實現(xiàn)類怎么來?當(dāng)然不用自己寫了,Java 已經(jīng)提供了一個工廠類 Excutors,可以方便的構(gòu)造出多種 ExecutorService 的實現(xiàn)類,滿足各種場景的應(yīng)用。這里列舉幾個 Excutors 里常用的靜態(tài)方法:
public static ExecutorService newCachedThreadPool()public static ExecutorService newFixedThreadPool(int nThreads)public static ExecutorService newSingleThreadExecutor()
從函數(shù)名就可以猜出來,這三個方法構(gòu)造的 ExecutorService 都維護(hù)著一個線程池,CachedThreadPool 線程的數(shù)量沒有上限,FixedThreadPool的線程數(shù)量的上限由用戶指定,而 SingleThreadExecutor 的線程數(shù)量為1,即所有任務(wù)都串行執(zhí)行。
查看源碼可以發(fā)現(xiàn)這幾個工廠方法最終都調(diào)用了 ThreadPoolExecutor 這個類的構(gòu)造方法,只是傳入的不同的配置參數(shù),從而構(gòu)造出了具有不同特性的線程池。
構(gòu)造好了線程池,接下來的事情就簡單了,直接把 Runnable 對象丟進(jìn)去,即調(diào)用
void execute(Runnable command);
任務(wù)就會自動在子線程執(zhí)行了,簡直不能更簡單了。
總結(jié)一下就是,通過 Excutors 類的工廠方法獲得一個 ExcutorService 的實例,然后調(diào)用 ExecutorService 的 execute 方法執(zhí)行 Runnable 任務(wù)。
2. Callable 、Future 以及 FutureTask
上一節(jié)介紹的 execute 方法,雖然簡單好用,但是有一個缺點就是任務(wù)執(zhí)行一旦開始,就無法獲知任務(wù)的執(zhí)行情況,也無法在任務(wù)執(zhí)行完成后得到一個返回值。要想實現(xiàn)這樣的功能,就需要 Callable 和 Future 了。
Callable 和 Runnable 很像,也是一個接口:
public interface Callable<V> {
V call() throws Exception;
}
除了函數(shù)名的變化,主要的不同是多了一個泛型返回,這個就是我們希望得到的返回值的類型。那么要執(zhí)行一個實現(xiàn)了 Callable 的任務(wù), ExecutorService 里面也有對應(yīng)的方法:
<T> Future<T> submit(Callable<T> task);
當(dāng)調(diào)用 submit 方法后,我們會得到一個 Future 對象。準(zhǔn)確的來說,Future也是個接口,它定義了幾個方法,用來查詢當(dāng)前任務(wù)執(zhí)行的狀態(tài),以及控制任務(wù)的執(zhí)行:
-
boolean cancel( boolean mayInterruptIfRunning):用來取消任務(wù),取消成功返回true,否則返回false 。mayInterruptIfRunning表示是否可以中斷正在執(zhí)行的任務(wù)。 -
boolean isCancel(): 查詢?nèi)蝿?wù)是否在完成前被成功取消 -
boolean isDone():查詢?nèi)蝿?wù)是否執(zhí)行完成 -
T get():得到任務(wù)的返回值,需要注意如果任務(wù)還沒完成,那么get()方法將會阻塞,直到任務(wù)完成。
Future 接口最常用的一個子類是 FutureTask,后面在AsyncTask 源碼分析時會見到。FutureTask 不僅實現(xiàn)了 Future 接口,也實現(xiàn)了 Runnable 接口,所以它可以通過 Executor 的 execute 方法執(zhí)行。
3. AsyncTask 初探
基本概念介紹完了,現(xiàn)在可以來看 AsyncTask 了。先介紹幾個重要的成員變量,代碼看上去有點長,不要被嚇到,其實很簡單:
//刪掉了一些暫時無需關(guān)注的成員變量和方法
public abstract class AsyncTask<Params, Progress, Result> {
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE = 1;
public static final Executor THREAD_POOL_EXECUTOR
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
private final WorkerRunnable<Params, Result> mWorker;
private final FutureTask<Result> mFuture;
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
...
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
}
下面逐個說明一下:
-
THREAD_POOL_EXECUTOR: 一個指定了最大線程數(shù)量的 Executor。最大線程數(shù)是MAXIMUM_POOL_SIZE, 可以看到其值取決于 CPU 的核心數(shù),如果是4核,那么最大線程數(shù)就是9個。但是這個線程池并沒有直接用來執(zhí)行異步任務(wù),后面會看到。 -
SERIAL_EXECUTOR: 從名字可以看出是一個串行執(zhí)行任務(wù)的 Executor,其具體實現(xiàn)在由下面的SerialExecutor類定義。實現(xiàn)方式也很簡單。維護(hù)了一個隊列mTasks,mTasks.offer在隊尾插入任務(wù),mTasks.poll()從隊首取出任務(wù)。怎么實現(xiàn)一個任務(wù)執(zhí)行后接著執(zhí)行下一個呢?這里很巧妙的,在插入任務(wù)的時候做了一層包裝,用一個 Runnable 包住一個 Runnable,有點代理模式的感覺。外層的Runnable 在執(zhí)行完內(nèi)層的Runnable之后,最終一定會執(zhí)行 finally 中進(jìn)行取下一個任務(wù)并執(zhí)行的操作,這樣就實現(xiàn)了一個不斷線性執(zhí)行任務(wù)的調(diào)度隊列。 -
sDefaultExecutor: 這個就是默認(rèn)的執(zhí)行異步任務(wù)的 Executor 了,其值被指定為SERIAL_EXECUTOR。所以我們知道了所有的 AsyncTask 最終會按照先后順序依次執(zhí)行,而并不是并發(fā)的執(zhí)行。 -
mWorker: 就是一個 Callable 接口多了一個保存參數(shù)的數(shù)組而已 -
mFuture: 是一個FutureTask 對象, 正是前面提到的 Future 接口的一個實現(xiàn)類
至此,我們看到了前面介紹到的 ExecutorService,Callable,F(xiàn)uture, FutureTask 都已經(jīng)悉數(shù)登場,有了前面兩節(jié)的鋪墊,再看這些類應(yīng)該毫無壓力了。接下來就是如何組織運用它們了。
4. AsyncTask 的構(gòu)造函數(shù)
AsyncTask 的構(gòu)造函數(shù)并不復(fù)雜,但是可以說 AsyncTask 架構(gòu)的關(guān)鍵部分都在這里定義了。上源碼:
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}//構(gòu)造函數(shù)到此結(jié)束,下面只是涉及到的方法和類
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
//一個用來封裝數(shù)據(jù)的類,mData 字段,可以封裝任務(wù)的執(zhí)行結(jié)果數(shù)據(jù)
//也可以封裝任務(wù)的執(zhí)行進(jìn)度數(shù)據(jù)
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
可以看到主要是完成 mWorker 和 mFuture 兩個變量的初始化。
-
mWorker:call方法內(nèi)部,調(diào)用了doInBackground方法,這個太熟悉了,就是我們使用AsyncTask必須要實現(xiàn)的方法,執(zhí)行我們需要在子線程執(zhí)行的業(yè)務(wù)邏輯。執(zhí)行完之后的返回值調(diào)用postResult方法。postResult方法里面的操作我們也很熟悉,就是通過一個 Handler 把結(jié)果發(fā)送出去,這個Handler 對象后面再講。注意 mTaskInvoked.set(true),這里mTaskInvoked 是一個提供了原子操作的布爾型對象,用來標(biāo)記任務(wù)已經(jīng)被調(diào)用 -
mFuture:只實現(xiàn)了一個 done 方法,這個方法會在任務(wù)終止(正常執(zhí)行完或被取消)的時候調(diào)用。這里的postResultIfNotInvoked邏輯有點奇怪,明明只要任務(wù)一開始執(zhí)行,mWorker的call方法就會被調(diào)用,mTaskInvoked就肯定被設(shè)置為 true 了,怎么會有等于 false 的情況出現(xiàn)呢?最后答案在這里 ,是為了修復(fù)一個bug,這里就不深究了,我們只要知道絕大多數(shù)情況下,wasTaskInvoked都會被設(shè)為 ture 就好了。
4. AsyncTask 里的 Handler
上一節(jié)中的 postResult 里我們看到了一個 getHandler() 方法,現(xiàn)在就來研究這個 Handler 有什么特別了 (其實真的沒什么特別的...):
private static Handler getHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler();
}
return sHandler;
}
}
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
getHandler() 返回了一個 InternalHandler。InternalHandler 處理兩種消息,一種是任務(wù)執(zhí)行結(jié)果的消息 MESSAGE_POST_RESULT,一種是任務(wù)執(zhí)行進(jìn)度的消息 MESSAGE_POST_PROGRESS。
收到任務(wù)執(zhí)行結(jié)果的消息會調(diào)用 AsyncTask 的 finish 方法,在這個方法里面,我們看到了熟悉的 onPostExecute(Result result) 。而收到任務(wù)執(zhí)行進(jìn)度的消息,會調(diào)用 onProgressUpdate(Progress... values) ,這個消息是在調(diào)用 publishProgress(Progress... values) 方法的時候發(fā)送的。
這里的關(guān)鍵是在 InternalHandler 的構(gòu)造函數(shù)里,傳入的 Looper 是通過 Looper.getMainLooper() 方法得到的,這正是 UI 線程的 Looper,因而也就確保了 onPostExecute 和 onProgressUpdate 方法都在 UI 線程執(zhí)行。
至此,我們已經(jīng)在源碼中見到了通常使用 AsyncTask會用到的方法:
doInBackground 在子線程執(zhí)行異步任務(wù), publishProgress 在子線程發(fā)布任務(wù)執(zhí)行進(jìn)度,onProgressUpdate、onPostExecute 在 UI 線程接收任務(wù)執(zhí)行進(jìn)度和結(jié)果。好像還差了一個沒見到,onPreExecute,別著急,下面馬上就來
5. 最后一步,讓 AsyncTask 運轉(zhuǎn)起來
一切準(zhǔn)備工作已經(jīng)就緒,當(dāng)我們定義好一個 AsyncTask,最終要執(zhí)行的時候,就要調(diào)用 execute 方法:
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
execute 方法會調(diào)用到 executeOnExecutor(Executor exec, Params... params),傳入的 Executor 就是第二節(jié)介紹的 SerialExecutor。在這個方法里看到了 onPreExecute()確實是在任務(wù)執(zhí)行前調(diào)用 。最后exec.execute(mFuture); 終于到了任務(wù)執(zhí)行的地方。
6. 總結(jié) AsyncTask = Executor + Handler
經(jīng)過一番分析,理解了 AsyncTask 的內(nèi)部實現(xiàn),發(fā)現(xiàn)它主要就是封裝了一個可以串行執(zhí)行任務(wù)的 Executor 以及處理線程間消息傳遞的 Handler,極大的簡化了異步任務(wù)的處理。不過這里并沒有涉及到并發(fā)訪問資源的同步問題,因為所有的任務(wù)都被串行化執(zhí)行了。其實在3.0之前的幾個版本里任務(wù)曾經(jīng)是并發(fā)執(zhí)行的,但是這時候我們在 doInBackgroud 里面就要小心了,對于共享的資源訪問必須要考慮同步問題。大概Google考慮到多數(shù)情況下程序員們不會意識到這個問題,所以最終還是改為串行執(zhí)行了吧。