深入學(xué)習(xí)AsyncTask

AsyncTask 是一個安卓開發(fā)中很常用的類,畢竟有太多地方需要做異步處理,而 AsyncTask 做了很好的封裝。正好最近在看 Java 并發(fā),看了下 AsyncTask的源碼,里面也涉及到了這部分的內(nèi)容,正好一起學(xué)習(xí)一下。

本文先從 AsyncTask 中涉及到的并發(fā)的基礎(chǔ)部分講起,之后再分析 AsyncTask 源碼,這樣看起來會清晰很多。本文不會介紹AsyncTask的基本用法,只講底層實現(xiàn)。會包括以下內(nèi)容:

  • Executor
  • Callable、Future 和 FutureTask
  • AsyncTask 源碼分析

注:下面源碼中出現(xiàn)的 Params, Progress, Result,Data都是泛型參數(shù),注意一下,以免引起理解困難

1. Excutor

開啟一個子線程最簡單也最常見的方法就是用 Thread 去驅(qū)動一個 Runnable 對象。這種方法大家都很熟悉了這里就不再舉例。但其實 Java 類庫中還提供了一種更加強大的開啟多線程任務(wù)的方法,那就是 Excutor。從這個名字就可以直觀的理解,它是一個用來執(zhí)行任務(wù)的工具。

public interface Executor {
    void execute(Runnable command);
}

Executor 是一個只有一個方法的接口。一般使用的是 ExecutorService ,是一個繼承了 Executor 的接口,文檔中對 ExecutorService 的概述是這樣的:
An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.

簡單的說就是一個帶有生命周期管理的 Executor,并且可以追蹤其執(zhí)行的任務(wù)的狀態(tài)。這里涉及到 Future 和 Callable 兩個接口,稍后會介紹。

說了這么半天,那么實現(xiàn)類怎么來?當(dāng)然不用自己寫了,Java 已經(jīng)提供了一個工廠類 Excutors,可以方便的構(gòu)造出多種 ExecutorService 的實現(xiàn)類,滿足各種場景的應(yīng)用。這里列舉幾個 Excutors 里常用的靜態(tài)方法:

  • public static ExecutorService newCachedThreadPool()
  • public static ExecutorService newFixedThreadPool(int nThreads)
  • public static ExecutorService newSingleThreadExecutor()

從函數(shù)名就可以猜出來,這三個方法構(gòu)造的 ExecutorService 都維護(hù)著一個線程池,CachedThreadPool 線程的數(shù)量沒有上限,FixedThreadPool的線程數(shù)量的上限由用戶指定,而 SingleThreadExecutor 的線程數(shù)量為1,即所有任務(wù)都串行執(zhí)行。
查看源碼可以發(fā)現(xiàn)這幾個工廠方法最終都調(diào)用了 ThreadPoolExecutor 這個類的構(gòu)造方法,只是傳入的不同的配置參數(shù),從而構(gòu)造出了具有不同特性的線程池。

構(gòu)造好了線程池,接下來的事情就簡單了,直接把 Runnable 對象丟進(jìn)去,即調(diào)用

void execute(Runnable command);

任務(wù)就會自動在子線程執(zhí)行了,簡直不能更簡單了。
總結(jié)一下就是,通過 Excutors 類的工廠方法獲得一個 ExcutorService 的實例,然后調(diào)用 ExecutorServiceexecute 方法執(zhí)行 Runnable 任務(wù)。

2. Callable 、Future 以及 FutureTask

上一節(jié)介紹的 execute 方法,雖然簡單好用,但是有一個缺點就是任務(wù)執(zhí)行一旦開始,就無法獲知任務(wù)的執(zhí)行情況,也無法在任務(wù)執(zhí)行完成后得到一個返回值。要想實現(xiàn)這樣的功能,就需要 CallableFuture 了。

CallableRunnable 很像,也是一個接口:

public interface Callable<V> {
    V call() throws Exception;
}

除了函數(shù)名的變化,主要的不同是多了一個泛型返回,這個就是我們希望得到的返回值的類型。那么要執(zhí)行一個實現(xiàn)了 Callable 的任務(wù), ExecutorService 里面也有對應(yīng)的方法:

<T> Future<T> submit(Callable<T> task);

當(dāng)調(diào)用 submit 方法后,我們會得到一個 Future 對象。準(zhǔn)確的來說,Future也是個接口,它定義了幾個方法,用來查詢當(dāng)前任務(wù)執(zhí)行的狀態(tài),以及控制任務(wù)的執(zhí)行:

  • boolean cancel( boolean mayInterruptIfRunning):用來取消任務(wù),取消成功返回true,否則返回false 。mayInterruptIfRunning 表示是否可以中斷正在執(zhí)行的任務(wù)。
  • boolean isCancel(): 查詢?nèi)蝿?wù)是否在完成前被成功取消
  • boolean isDone() :查詢?nèi)蝿?wù)是否執(zhí)行完成
  • T get() :得到任務(wù)的返回值,需要注意如果任務(wù)還沒完成,那么 get() 方法將會阻塞,直到任務(wù)完成。

Future 接口最常用的一個子類是 FutureTask,后面在AsyncTask 源碼分析時會見到。FutureTask 不僅實現(xiàn)了 Future 接口,也實現(xiàn)了 Runnable 接口,所以它可以通過 Executor 的 execute 方法執(zhí)行。

3. AsyncTask 初探

基本概念介紹完了,現(xiàn)在可以來看 AsyncTask 了。先介紹幾個重要的成員變量,代碼看上去有點長,不要被嚇到,其實很簡單:

//刪掉了一些暫時無需關(guān)注的成員變量和方法
public abstract class AsyncTask<Params, Progress, Result> {

    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    private static final int KEEP_ALIVE = 1;

    public static final Executor THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                    TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
   
    private final WorkerRunnable<Params, Result> mWorker;
    private final FutureTask<Result> mFuture;
   
    private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;

        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }
    ...

    private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
        Params[] mParams;
    }
}

下面逐個說明一下:

  • THREAD_POOL_EXECUTOR : 一個指定了最大線程數(shù)量的 Executor。最大線程數(shù)是 MAXIMUM_POOL_SIZE, 可以看到其值取決于 CPU 的核心數(shù),如果是4核,那么最大線程數(shù)就是9個。但是這個線程池并沒有直接用來執(zhí)行異步任務(wù),后面會看到。
  • SERIAL_EXECUTOR: 從名字可以看出是一個串行執(zhí)行任務(wù)的 Executor,其具體實現(xiàn)在由下面的 SerialExecutor 類定義。實現(xiàn)方式也很簡單。維護(hù)了一個隊列 mTasks,mTasks.offer在隊尾插入任務(wù),mTasks.poll() 從隊首取出任務(wù)。怎么實現(xiàn)一個任務(wù)執(zhí)行后接著執(zhí)行下一個呢?這里很巧妙的,在插入任務(wù)的時候做了一層包裝,用一個 Runnable 包住一個 Runnable,有點代理模式的感覺。外層的Runnable 在執(zhí)行完內(nèi)層的Runnable之后,最終一定會執(zhí)行 finally 中進(jìn)行取下一個任務(wù)并執(zhí)行的操作,這樣就實現(xiàn)了一個不斷線性執(zhí)行任務(wù)的調(diào)度隊列。
  • sDefaultExecutor: 這個就是默認(rèn)的執(zhí)行異步任務(wù)的 Executor 了,其值被指定為SERIAL_EXECUTOR。所以我們知道了所有的 AsyncTask 最終會按照先后順序依次執(zhí)行,而并不是并發(fā)的執(zhí)行。
  • mWorker: 就是一個 Callable 接口多了一個保存參數(shù)的數(shù)組而已
  • mFuture: 是一個FutureTask 對象, 正是前面提到的 Future 接口的一個實現(xiàn)類

至此,我們看到了前面介紹到的 ExecutorService,Callable,F(xiàn)uture, FutureTask 都已經(jīng)悉數(shù)登場,有了前面兩節(jié)的鋪墊,再看這些類應(yīng)該毫無壓力了。接下來就是如何組織運用它們了。

4. AsyncTask 的構(gòu)造函數(shù)

AsyncTask 的構(gòu)造函數(shù)并不復(fù)雜,但是可以說 AsyncTask 架構(gòu)的關(guān)鍵部分都在這里定義了。上源碼:

    public AsyncTask() {
        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);

                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                Result result = doInBackground(mParams);
                Binder.flushPendingCommands();
                return postResult(result);
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }//構(gòu)造函數(shù)到此結(jié)束,下面只是涉及到的方法和類

    private void postResultIfNotInvoked(Result result) {
        final boolean wasTaskInvoked = mTaskInvoked.get();
        if (!wasTaskInvoked) {
            postResult(result);
        }
    }

    private Result postResult(Result result) {
        @SuppressWarnings("unchecked")
        Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                new AsyncTaskResult<Result>(this, result));
        message.sendToTarget();
        return result;
    }
    
    //一個用來封裝數(shù)據(jù)的類,mData 字段,可以封裝任務(wù)的執(zhí)行結(jié)果數(shù)據(jù)
    //也可以封裝任務(wù)的執(zhí)行進(jìn)度數(shù)據(jù)
    private static class AsyncTaskResult<Data> {
        final AsyncTask mTask;
        final Data[] mData;

        AsyncTaskResult(AsyncTask task, Data... data) {
            mTask = task;
            mData = data;
        }
    }

可以看到主要是完成 mWorkermFuture 兩個變量的初始化。

  • mWorkercall 方法內(nèi)部,調(diào)用了 doInBackground 方法,這個太熟悉了,就是我們使用AsyncTask必須要實現(xiàn)的方法,執(zhí)行我們需要在子線程執(zhí)行的業(yè)務(wù)邏輯。執(zhí)行完之后的返回值調(diào)用 postResult 方法。postResult 方法里面的操作我們也很熟悉,就是通過一個 Handler 把結(jié)果發(fā)送出去,這個Handler 對象后面再講。注意 mTaskInvoked.set(true),這里mTaskInvoked 是一個提供了原子操作的布爾型對象,用來標(biāo)記任務(wù)已經(jīng)被調(diào)用
  • mFuture:只實現(xiàn)了一個 done 方法,這個方法會在任務(wù)終止(正常執(zhí)行完或被取消)的時候調(diào)用。這里的 postResultIfNotInvoked 邏輯有點奇怪,明明只要任務(wù)一開始執(zhí)行, mWorkercall 方法就會被調(diào)用,mTaskInvoked 就肯定被設(shè)置為 true 了,怎么會有等于 false 的情況出現(xiàn)呢?最后答案在這里 ,是為了修復(fù)一個bug,這里就不深究了,我們只要知道絕大多數(shù)情況下,wasTaskInvoked 都會被設(shè)為 ture 就好了。

4. AsyncTask 里的 Handler

上一節(jié)中的 postResult 里我們看到了一個 getHandler() 方法,現(xiàn)在就來研究這個 Handler 有什么特別了 (其實真的沒什么特別的...):

    private static Handler getHandler() {
        synchronized (AsyncTask.class) {
            if (sHandler == null) {
                sHandler = new InternalHandler();
            }
            return sHandler;
        }
    }

    private static class InternalHandler extends Handler {
        public InternalHandler() {
            super(Looper.getMainLooper());
        }

        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
            switch (msg.what) {
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
    }

    private void finish(Result result) {
        if (isCancelled()) {
            onCancelled(result);
        } else {
            onPostExecute(result);
        }
        mStatus = Status.FINISHED;
    }

    protected final void publishProgress(Progress... values) {
        if (!isCancelled()) {
            getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
                    new AsyncTaskResult<Progress>(this, values)).sendToTarget();
        }
    }

getHandler() 返回了一個 InternalHandler。InternalHandler 處理兩種消息,一種是任務(wù)執(zhí)行結(jié)果的消息 MESSAGE_POST_RESULT,一種是任務(wù)執(zhí)行進(jìn)度的消息 MESSAGE_POST_PROGRESS。
收到任務(wù)執(zhí)行結(jié)果的消息會調(diào)用 AsyncTask 的 finish 方法,在這個方法里面,我們看到了熟悉的 onPostExecute(Result result) 。而收到任務(wù)執(zhí)行進(jìn)度的消息,會調(diào)用 onProgressUpdate(Progress... values) ,這個消息是在調(diào)用 publishProgress(Progress... values) 方法的時候發(fā)送的。

這里的關(guān)鍵是在 InternalHandler 的構(gòu)造函數(shù)里,傳入的 Looper 是通過 Looper.getMainLooper() 方法得到的,這正是 UI 線程的 Looper,因而也就確保了 onPostExecuteonProgressUpdate 方法都在 UI 線程執(zhí)行。

至此,我們已經(jīng)在源碼中見到了通常使用 AsyncTask會用到的方法:
doInBackground 在子線程執(zhí)行異步任務(wù), publishProgress 在子線程發(fā)布任務(wù)執(zhí)行進(jìn)度,onProgressUpdateonPostExecute 在 UI 線程接收任務(wù)執(zhí)行進(jìn)度和結(jié)果。好像還差了一個沒見到,onPreExecute,別著急,下面馬上就來

5. 最后一步,讓 AsyncTask 運轉(zhuǎn)起來

一切準(zhǔn)備工作已經(jīng)就緒,當(dāng)我們定義好一個 AsyncTask,最終要執(zhí)行的時候,就要調(diào)用 execute 方法:

    public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }

    public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
            Params... params) {
        if (mStatus != Status.PENDING) {
            switch (mStatus) {
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }

        mStatus = Status.RUNNING;

        onPreExecute();

        mWorker.mParams = params;
        exec.execute(mFuture);

        return this;
    }

execute 方法會調(diào)用到 executeOnExecutor(Executor exec, Params... params),傳入的 Executor 就是第二節(jié)介紹的 SerialExecutor。在這個方法里看到了 onPreExecute()確實是在任務(wù)執(zhí)行前調(diào)用 。最后exec.execute(mFuture); 終于到了任務(wù)執(zhí)行的地方。

6. 總結(jié) AsyncTask = Executor + Handler

經(jīng)過一番分析,理解了 AsyncTask 的內(nèi)部實現(xiàn),發(fā)現(xiàn)它主要就是封裝了一個可以串行執(zhí)行任務(wù)的 Executor 以及處理線程間消息傳遞的 Handler,極大的簡化了異步任務(wù)的處理。不過這里并沒有涉及到并發(fā)訪問資源的同步問題,因為所有的任務(wù)都被串行化執(zhí)行了。其實在3.0之前的幾個版本里任務(wù)曾經(jīng)是并發(fā)執(zhí)行的,但是這時候我們在 doInBackgroud 里面就要小心了,對于共享的資源訪問必須要考慮同步問題。大概Google考慮到多數(shù)情況下程序員們不會意識到這個問題,所以最終還是改為串行執(zhí)行了吧。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容