Android源碼:Handler, Looper和MessageQueue實現(xiàn)解析

做了6年的Android開發(fā),此間做的事情非常雜包括ROM,SDK和APP,從來沒有好好的研究過Android的基礎(chǔ)代碼。趁著這段時間項目沒那么忙,把基礎(chǔ)的東西仔細研究清楚并記錄下來。

想到要分析Looper和Handler的時候其實正在看背光調(diào)節(jié)模塊的代碼,其中看到PowerManagerService中很多API最終是向Handler發(fā)送Message的形式進行實際操作。Handler在做App的時候用的很多,最常見的用法就是extend一個Handler類然后重載其handleMessage方法,然后要發(fā)消息的時候調(diào)用其sendMessage API即可。使用Handler還可以在不同的線程之間傳遞消息實現(xiàn)異步操作,下邊我們扒一扒Android N的源碼看看其實現(xiàn)以及Handler, Looper以及Thread之間的關(guān)系。

一,Looper, Thread之間的關(guān)系

先來看一段代碼

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);

    Handler h1 = new Handler();

    new Thread(new Runnable() {
        @Override public void run() {
            Handler h2 = new Handler();
        }
    }).start();
}

運行之后h1正常創(chuàng)建,但是創(chuàng)建h2的時候crash了:

--------- beginning of crash
E/AndroidRuntime: FATAL EXCEPTION: Thread-263
Process: com.example.stone.sfsandroidclient, PID: 32286
java.lang.RuntimeException: Can't create handler inside thread that has not called Looper.prepare()
   at android.os.Handler.<init>(Handler.java:200)
   at android.os.Handler.<init>(Handler.java:114)
   at com.example.stone.sfsandroidclient.MainActivity$1.run(MainActivity.java:71)
   at java.lang.Thread.run(Thread.java:818)

出錯日志提示不能在一個沒有調(diào)用過Looper.prepare()的Thread里邊new Handler()。為什么Looper.prepare()會影響Handler的創(chuàng)建呢,先看源碼哪里會拋出這個異常:

static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

public Handler(Callback callback, boolean async) {
    ... ...
    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    ... ...
}

public static @Nullable Looper myLooper() {
    return sThreadLocal.get();
}

這個sThreadLocal是一個ThreadLocal(線程局部)對象,它有g(shù)et()/set()這么一對方法。它的工作機制是,僅當一個線程調(diào)用了它的set()來設(shè)置值之后,get()才能獲取到設(shè)置進去的對象,而且這個對象是線程唯一的。也就是說在ThreadA里面調(diào)用了sThreadLocal.set(),在ThreadB里面調(diào)用sThreadLocal.get()還是null。所以在Looper.prepare()里一定調(diào)用了set():

public static void prepare() {
    prepare(true);
}

private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}

public static void prepareMainLooper() {
    prepare(false);
    synchronized (Looper.class) {
        if (sMainLooper != null) {
            throw new IllegalStateException("The main Looper has already been prepared.");
        }
        sMainLooper = myLooper();
    }
}

為什么主線程上來就有Looper而我們自己new的Thread就沒有呢,因為在主線程里邊有這段(代碼):

public static void main(String[] args) {
    ... ...
    Looper.prepareMainLooper();
    ... ...
}

所以主線程是“生來”就有一個Looper跟它綁定在一起的,而且這個looper身份很特殊,它是整個進程里面的main looper,通過Looper.getMainLooper()就能拿到,接著就可以為所欲為的做些更新UI的活兒了。
那么其他的Thread要怎樣才能也擁有一個Looper么,Android為我們提供了一個很好的例子 - HandlerThread類,用法如下:

HandlerThread ht = new HandlerThread("worker");
ht.start();
Handler handler = new Handler(ht.getLooper());

關(guān)鍵就在HandlerThread的run里面:

@Override
public void run() {
    mTid = Process.myTid();
    Looper.prepare();
    synchronized (this) {
        mLooper = Looper.myLooper();
        notifyAll();
    }
    Process.setThreadPriority(mPriority);
    onLooperPrepared();
    Looper.loop();
    mTid = -1;
}

從上述分析可知Looper.prepare()之后這個Thread就有了一個跟它關(guān)聯(lián)在一起的Looper了,這里頭有兩個需要注意的點:一是new Handler()要在ht.start()之后,二是如果要extend HandlerThread則記得要在run()里面調(diào)用super.run()。

二, Looper, Handler和MessageQueue

還是看Handler的構(gòu)造函數(shù)

public Handler(Callback callback, boolean async) {
    ... ...
    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
}

可知它們的關(guān)系為,一個HandlerThread帶有一個Looper,Looper有一個MessageQueue和多個與之關(guān)聯(lián)的Handler,而一個Handler只與一個Looper有關(guān)聯(lián)。UML類圖如下:

handler_looper.png

在使用中接觸得最多的是Message,它在MessageQueue里面以單項鏈表的形式組織,mMessages指向鏈表頭部。每個Java層的MessageQueue都有一個C++實現(xiàn)的NativeMessageQueue與之對應(yīng),關(guān)鍵的實現(xiàn)部分其實在native層中,下邊的章節(jié)會分析。

三,工作機制

先借一張《Efficient Android Threading》里面的圖:

looper.png

顧名思義,Looper的主要執(zhí)行過程是無線循環(huán)的loop(),它在HandlerThread的run()運行起來:

public static void loop() {
    final Looper me = myLooper();
    final MessageQueue queue = me.mQueue;

    for (;;) {
        // queue.next()是阻塞調(diào)用,后面詳細分析
        Message msg = queue.next(); // might block
        // 當next()返回null的時候,表示Looper.quit()被調(diào)用了所以退出無限循環(huán)
        if (msg == null) {
            // No message indicates that the message queue is quitting.
            return;
        }
        ... ...
        try {
            // 找到了msg就交給target(Handler)開干吧
            msg.target.dispatchMessage(msg);
        } finally {
            if (traceTag != 0) {
                Trace.traceEnd(traceTag);
            }
        }
        // Message可回收利用,這里不展開了
        msg.recycleUnchecked();
    }
}

Handler的dispatchMessage()就沒什么可說的了:

public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
        handleCallback(msg);
    } else {
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        handleMessage(msg);
    }
}

里面要么就是執(zhí)行msg自己定義的callback,要么就是在extend Handler時定義的handleMessage方法。要記得的就是這個方法的調(diào)用是在Handler關(guān)聯(lián)的Looper所關(guān)聯(lián)的線程里面執(zhí)行的。所以如果你的Handler關(guān)聯(lián)的是main Looper,就不要在handleMessage()里面做耗時的操作(網(wǎng)絡(luò)請求,IO,密集計算等)了,否則很有可能造成ANR。
接著分析最關(guān)鍵的queue.next():

Message next() {
    ... ...
    int nextPollTimeoutMillis = 0;
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            Binder.flushPendingCommands();
        }
        // 阻塞在native層
        nativePollOnce(ptr, nextPollTimeoutMillis);
        // mMessages有可能被其他線程修改(通過sendMessage),鎖起來
        synchronized (this) {
            // Try to retrieve the next message.  Return if found.
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            if (msg != null && msg.target == null) {
                // 這是一個有故事的msg因為target為null表示這個一個barrier(圍欄)
                // barrier可以阻止卡住所有sync類型的msg的執(zhí)行,它通過postSyncBarrier()插入,
                // 通過removeSyncBarrier()去除,當barrier去除后所有被攔住的sync msg都被依次執(zhí)行
                // 目前源碼中用到了barrier的就只有view的invalidation機制了。
                // 如果特別重要的msg還是有特權(quán)可以執(zhí)行的,async msg就是這個例外。
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            if (msg != null) {
                // 當沒有barrier而且有sync msg,或者有barrier但是找到async msg的時候
                if (now < msg.when) {
                    // msg定義的when還沒到,讓native繼續(xù)等nextPollTimeoutMillis時長
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // 有到點的msg,返回給Looper的loop()處理
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;
                    if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                    msg.markInUse();
                    return msg;
                }
            } else {
                // 沒有msg了,在native層阻塞
                nextPollTimeoutMillis = -1;
            }

            // 如果quit()調(diào)用了,通知Looper.loop()退出無限循環(huán)
            if (mQuitting) {
                dispose();
                return null;
            }
        ... ...
        // 在idle handler執(zhí)行的時候有可能來了新的mesasge,設(shè)為0不阻塞馬上進行檢查
        nextPollTimeoutMillis = 0;
    }
}

捎帶講講barrier的插入/拔出,沒興趣的可以略過畢竟用得少:

private int postSyncBarrier(long when) {
    // 將barrier插入到mMessges隊列中,回收回來的msg的target是空的
    // 把token返回給調(diào)用者,拔掉barrier的時候是需要token的
    synchronized (this) {
        final int token = mNextBarrierToken++;
        final Message msg = Message.obtain();
        msg.markInUse();
        msg.when = when;
        msg.arg1 = token;

        Message prev = null;
        Message p = mMessages;
        if (when != 0) {
            while (p != null && p.when <= when) {
                prev = p;
                p = p.next;
            }
        }
        if (prev != null) { // invariant: p == prev.next
            msg.next = p;
            prev.next = msg;
        } else {
            msg.next = p;
            mMessages = msg;
        }
        return token;
    }
}

public void removeSyncBarrier(int token) {
    synchronized (this) {
        Message prev = null;
        Message p = mMessages;
        // 對一下token
        while (p != null && (p.target != null || p.arg1 != token)) {
            prev = p;
            p = p.next;
        }
        if (p == null) {
            throw new IllegalStateException("The specified message queue synchronization "
                    + " barrier token has not been posted or has already been removed.");
        }
        final boolean needWake;
        if (prev != null) {
            prev.next = p.next;
            needWake = false;
        } else {
            mMessages = p.next;
            needWake = mMessages == null || mMessages.target != null;
        }
        p.recycleUnchecked();

        // 拔出barrier喚醒阻塞在native的looper所在線程
        if (needWake && !mQuitting) {
            nativeWake(mPtr);
        }
    }
}

四, native層的NativeMessageQueue機制

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    // 交給native looper去poll了
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;
    ... ...
}

native looper使用了linux的epoll機制:

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        // native looper實現(xiàn)了很復雜的功能,可以從fd中讀取遠端響應(yīng)并返回
        // 但是MessageQueue只用到了其阻塞的特點,所以這里忽略了一大段代碼
        ... ...
        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }

        result = pollInner(timeoutMillis);
    }
}

int Looper::pollInner(int timeoutMillis) {
    ... ...
    // 同樣,這樣刪了更大一段代碼因為我們只關(guān)心阻塞
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // 不管返回什么,不阻塞的返回就好
    if (eventCount < 0) {
        ... ...
        goto Done;
    }

    // Check for poll timeout.
    if (eventCount == 0) {
        ... ...
        goto Done;
    }
    ... ...
Done: ;
    ... ...
    return result;
}

這里最最重要的就是epoll_wait()調(diào)用,在linux shell里面man epoll_wait得到提示:

NAME
       epoll_wait, epoll_pwait - wait for an I/O event on an epoll file descriptor

SYNOPSIS
       #include <sys/epoll.h>

       int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);
... ...
RETURN VALUE
       When successful, epoll_wait() returns the number of file descriptors ready for the requested I/O, or zero if no file  descrip‐
       tor  became  ready  during the requested timeout milliseconds.  When an error occurs, epoll_wait() returns -1 and errno is set
       appropriately.

其中timeout即Looper中的timeoutMillis即Java層的MessageQueue.next()中的nextPollTimeoutMillis。重新看next()中nextPollTimeoutMillis有3種可能的選擇:0, -1或(int) Math.min(msg.when - now, Integer.MAX_VALUE);即最近一個msg的到期時間,傳到epoll_wait()中產(chǎn)生的效果分表是馬上返回、一直阻塞直至mEpollFd可用、阻塞msg.when-now的時長或者mEpollFd可用。這樣就達到了閑時線程阻塞交出CPU,msg到期之后Thread接著執(zhí)行的效果。當Thread正在永久阻塞的時候,還可以通過讓mEpollFd變?yōu)榭捎脕硗ㄖ€程跳出阻塞。

五,Native Looper的核心 - epoll

還是只關(guān)心阻塞和喚醒那部分,首先要先了解linux的epoll機制,epoll需要三個linux系統(tǒng)調(diào)用包括上邊的epoll_wait還有epoll_create以及epoll_ctl,看看native Looper是使用這些系統(tǒng)調(diào)用的:

Looper::Looper(bool allowNonCallbacks) :......{
    // 通過eventfd生成一個專門用于發(fā)射event的fd
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    ... ...
    rebuildEpollLocked();
}

void Looper::rebuildEpollLocked() {
    // 通過epoll_create生成用于epoll_wait調(diào)用的fd
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));

    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd;
    // 通過epoll_ctl將mEpollFd和mWakeEventFd聯(lián)系起來
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);

}

至此,mEpollFd和mWakeEventFd就變成了一對好基友,當向mWakeEventFd寫入數(shù)據(jù)的時候 阻塞在epoll_wait(mEpollFd...);的線程將會被喚醒(向mWakeEventFd寫入event的話epoll_wait還能讀取到該event,我們只關(guān)心喚醒所以不作進一步解析)。所以native Looper的wake()是這樣實現(xiàn)的:

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif

    uint64_t inc = 1;
    // 向mWakeEventFd里面寫了個1,用來喚醒足夠了
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            ALOGW("Could not write wake signal: %s", strerror(errno));
        }
    }
}

想要了解更多可以man epoll_ctl, man epoll_create以及查找linux epoll,管道pipe的實現(xiàn)機制??偟膩碚f,Android的Looper設(shè)計的是相當?shù)木伞?/p>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容