EventBus源碼分析(二)

上一篇關(guān)于訂閱和取消訂閱的分析:http://m.itdecent.cn/p/3f08a23c4544
上一篇對訂閱和取消訂閱進行了一個源碼分析,簡單來講就是我們在類中調(diào)用@Subscribe所訂閱事件的方法在訂閱過程被封裝成了subscriberMethod對象并被逐一添加到subscriptionsByEventType和typesBySubscriber這兩個map中去,取消訂閱則是分別從這兩個map中移除相關(guān)的映射關(guān)系。
注冊訂閱事件后,接下來看一下是如何發(fā)送訂閱事件的,發(fā)送訂閱事件使用的是:

  EventBus.getDefault().post(TestEvent())

點擊進去看一下post方法:

    private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };
    /** Posts the given event to the event bus. */
    public void post(Object event) {
        //分裝成PostingThreadState 對象
        PostingThreadState postingState = currentPostingThreadState.get();
        //從postingState獲取事件隊列
        List<Object> eventQueue = postingState.eventQueue;
        // 將當(dāng)前要發(fā)送的事件加入到隊列中
        eventQueue.add(event);

        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                //while 不斷輪詢發(fā)送事件
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

    /** For ThreadLocal, much faster to set (and get multiple values). */
    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;
    }

currentPostingThreadState是一個ThreadLocal類型的變量(ThreadLocal的作用:ThreadLocal是解決線程安全問題一個很好的思路,它通過為每個線程提供一個獨立的變量副本解決了變量并發(fā)訪問的沖突問題。在很多情況下,ThreadLocal比直接使用synchronized同步機制解決線程安全問題更簡單,更方便,且結(jié)果程序擁有更高的并發(fā)性。)currentPostingThreadState中存儲了當(dāng)前線程對應(yīng)的事件列表和線程的狀態(tài)信息等,上述主要調(diào)用了輪詢調(diào)用postSingleEvent方法,看一下postSingleEvent方法:

    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        //獲取該事件對象類型
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            //是否支持事件繼承,默認(rèn)為true,如果訂閱了父類型,當(dāng)發(fā)送子類型事件實也會調(diào)用其相關(guān)訂閱方法
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
         // 找不到該事件的異常處理
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

上述eventInheritance默認(rèn)為true,表示如果訂閱了父類型,當(dāng)發(fā)送子類型事件實也會調(diào)用其相關(guān)訂閱方法,最終是調(diào)用postSingleEventForEventType進行分發(fā),看一下postSingleEventForEventType:

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        //CopyOnWriteArrayList是一個線程安全的list,寫入時會復(fù)制一份數(shù)據(jù)出來,之后再賦值回去
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            //從subscriptionsByEventType map中獲取該eventType下的所有訂閱對象,subscriptionsByEventType會不會有點熟悉??
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
             // 遍歷該eventType對應(yīng)下的訂閱對象,并調(diào)用postToSubscription執(zhí)行分發(fā)操作
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

subscriptionsByEventType會不會覺得有點熟悉??這個便是我們上一篇訂閱分析所提到的,這個map key為某一事件類型,value為該事件類型下的所有訂閱,我們從subscriptionsByEventType中獲取該eventType下的所有訂閱后對其進行遍歷,并逐一調(diào)用postToSubscription()把事件分發(fā)到每一個訂閱對象中去,繼續(xù)看postToSubscription,這里我們會根據(jù)訂閱方法指定的threadMode信息來執(zhí)行不同的發(fā)布策略:

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

threadMode總共有以下幾種類型
POSTING:執(zhí)行invokeSubscriber()方法,就是直接反射調(diào)用;
MAIN:首先去判斷當(dāng)前是否在UI線程,如果是的話則直接反射調(diào)用,否則調(diào)用mainThreadPoster#enqueue(),即把當(dāng)前的方法加入到隊列之中,然后通過handler去發(fā)送一個消息,在handler的handleMessage中去執(zhí)行方法。具體邏輯在HandlerPoster.java中;
MAIN_ORDERED:與上面邏輯類似,順序執(zhí)行我們的方法;
BACKGROUND:判斷當(dāng)前是否在UI線程,如果不是的話直接反射調(diào)用,是的話通過backgroundPoster.enqueue()將方法加入到后臺的一個隊列,最后通過線程池去執(zhí)行;
ASYNC:與BACKGROUND的邏輯類似,將任務(wù)加入到后臺的一個隊列,最終由Eventbus中的一個線程池去調(diào)用,這里的線程池與BACKGROUND邏輯中的線程池用的是同一個。

這里先取一個分支來看,假設(shè)我們指定事件監(jiān)聽最后是回到主線程,也即是平常常使用的 @Subscribe(threadMode = ThreadMode.MAIN),那么這里將會來到 case MAIN:分支,第一步先判斷發(fā)送事件的時候(即調(diào)用event post)是不是在主線程,是的話直接執(zhí)行invokeSubscriber()使用反射執(zhí)行方法

    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            //使用反射執(zhí)行
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

如果發(fā)送事件的時候(即調(diào)用event post)不是在主線程,則執(zhí)行mainThreadPoster.enqueue(subscription, event)方法,那這個mainThreadPoster是什么呢?

//簡化代碼
private final Poster mainThreadPoster;
....
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
....
    MainThreadSupport getMainThreadSupport() {
        if (mainThreadSupport != null) {
            return mainThreadSupport;
        } else if (AndroidLogger.isAndroidLogAvailable()) {
            Object looperOrNull = getAndroidMainLooperOrNull();
            return looperOrNull == null ? null :
                    new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
        } else {
            return null;
        }
    }
....
public interface MainThreadSupport {

    boolean isMainThread();

    Poster createPoster(EventBus eventBus);

    class AndroidHandlerMainThreadSupport implements MainThreadSupport {

        private final Looper looper;

        public AndroidHandlerMainThreadSupport(Looper looper) {
            this.looper = looper;
        }

        @Override
        public boolean isMainThread() {
            return looper == Looper.myLooper();
        }

        @Override
        public Poster createPoster(EventBus eventBus) {
            return new HandlerPoster(eventBus, looper, 10);
        }
    }

}

可以看到,mainThreadPoster內(nèi)部創(chuàng)建了一個主線程Looper,并最終new了一個HandlerPoster,HandlerPoster是mainThreadPoster的實現(xiàn)類,這里大概可以猜到mainThreadPoster其實就是主線程的handler,看一下HandlerPoster如何實現(xiàn),先看一下enqueue方法:

public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        //構(gòu)建一個PendingPost對象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //添加進PendingPost隊列
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                //調(diào)用sendMessage發(fā)送消息,從而觸發(fā)handleMessage
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }
  ......省略

PendingPostQueue是一個簡單實現(xiàn)的鏈表,內(nèi)部保存了兩個PendingPost對象 ,一頭一尾,尾部插入,頭部移除,調(diào)用enqueue()從鏈表尾部插入,調(diào)用poll()從鏈表頭部移除,每次插入后調(diào)用sendMessage從而回調(diào)到handleMessage,看一下handleMessage是如何從這個PendingPostQueue取出這個訂閱事件的:

  ......省略
    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            //記錄開始時間
            long started = SystemClock.uptimeMillis();
            while (true) {
                //while循環(huán)一直從PendingPostQueue取出PendingPost
                PendingPost pendingPost = queue.poll();
                //如果隊列為空則取消
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //這里同樣使用反射調(diào)用了方法
                eventBus.invokeSubscriber(pendingPost);
                //判斷執(zhí)行是否超過了指定時間,是的話重新調(diào)用sendMessage方法
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }

可以看出,其實mainThreadPoster主要是幫我們回調(diào)到主線程,其內(nèi)部本質(zhì)上還是調(diào)用了反射去執(zhí)行方法,關(guān)于handleMessage,第一步是用while循環(huán)不斷輪詢?nèi)〕鲫犃兄械腜endingPost,當(dāng)隊列為空則停止輪詢,當(dāng)執(zhí)行處理方法的時長過長時則重新調(diào)用sendMessage,從而繼續(xù)回調(diào)到handleMessage,這是為了防止執(zhí)行時間過長,導(dǎo)致while循環(huán)阻塞主線程造成卡頓。
本篇博文到此結(jié)束,關(guān)于其他類型的threadMode執(zhí)行操作,可自行在研究,差不多。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容