EventBus3.0源碼解析

前言

EventBus是一個(gè)優(yōu)秀的事件訂閱/發(fā)布框架,充分解耦了事件的發(fā)布者和訂閱者,簡(jiǎn)化了代碼。 記得剛來公司,老大讓我看的第一個(gè)框架就是EventBus, 用了這么久的EventBus,居然沒看過源碼,實(shí)在慚愧。。。雖然網(wǎng)上已經(jīng)有很多解析EventBus的文章了,還是決定自己寫一篇,這樣可以有更深層次、印象更深的理解。

EventBus的使用

這篇文章詳細(xì)講解了EventBus的三要素、四種線程模型,以及普通事件和粘性事件的訂閱和分發(fā),這里不再多說。

EventBus3.0源碼解析

通過EventBus的使用可以看到EventBus主要分為兩部分,事件的訂閱(注冊(cè))和分發(fā)。

看源碼之前先來熟悉幾個(gè)概念:

  • 事件: 可以是任意類型的對(duì)象, 包括自定義的事件. 通過EventBus.getDefault().post(event)分發(fā)出去, 在訂閱方法的參數(shù)中接收事件.
  • 訂閱者: EventBus.getDefault().register(MainActivity.this), MainActivity就是訂閱者.
  • 訂閱方法: 被@Subscribe修飾的方法, SubscriberMethod描述了訂閱方法相關(guān)信息
    final Method method;//訂閱方法
    final ThreadMode threadMode;//線程模型
    final Class<?> eventType;//傳遞進(jìn)來的事件的class對(duì)象
    final int priority;//訂閱方法的優(yōu)先級(jí)
    final boolean sticky;//是否是粘性事件
    String methodString;//方法的名稱
  • 訂閱關(guān)系Subscription:關(guān)聯(lián)訂閱者和訂閱方法
    final Object subscriber;//訂閱者
    final SubscriberMethod subscriberMethod;//訂閱方法
    volatile boolean active;//是否有效, 初始化為true, unregister時(shí)置為false
  • 注解Subscribe: 用來標(biāo)識(shí)訂閱者的訂閱方法
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
    //線程模型
    ThreadMode threadMode() default ThreadMode.POSTING;
    //是否是粘性事件,默認(rèn)false
    boolean sticky() default false;
    //優(yōu)先級(jí), 影響同一個(gè)分發(fā)線程中事件分發(fā)的順序
    int priority() default 0;
}
  • 線程模型ThreadMode: 決定了事件將在哪一個(gè)線程中執(zhí)行,分以下四種:
    POSTING(默認(rèn)): 在哪個(gè)線程分發(fā)事件,就在哪個(gè)線程處理事件,不需要切換線程.避免執(zhí)行耗時(shí)操作, 有可能會(huì)阻塞主線程.
    ?適用于那些不需要主線程就能快速完成的簡(jiǎn)單任務(wù)
    MAIN: 在主線程中處理事件, 如果在主線程中分發(fā)事件,就直接處理事件, 如果不在主線程中分發(fā)事件,就通過Handler post到主線程進(jìn)行處理. 避免進(jìn)行耗時(shí)操作,阻塞主線程.
    ?適用于更新UI等必須在主線程處理的事件.
    BACKGROUND:在后臺(tái)進(jìn)程中處理事件. 如果不在主線程中分發(fā)事件, 就在同一個(gè)線程中處理事件. 如果在主線程中分發(fā)事件, 就啟用唯一的后臺(tái)進(jìn)程, 依次執(zhí)行隊(duì)列中所有的事件. 避免執(zhí)行長(zhǎng)時(shí)間耗時(shí)操作, 影響隊(duì)列中其他事件的處理;
    ?適用于一般的耗時(shí)操作;
    ASYNC: 單獨(dú)開一個(gè)線程處理事件, 獨(dú)立于主線程和分發(fā)事件的線程. 因?yàn)椴l(fā)線程數(shù)量有限, 應(yīng)避免同一時(shí)間觸發(fā)大量長(zhǎng)時(shí)間耗時(shí)操作的異步處理方法. EventBus會(huì)使用一個(gè)線程池來有效回收空閑的異步處理線程.
    ?適用于訪問網(wǎng)絡(luò)等長(zhǎng)時(shí)間耗時(shí)操作.

事件注冊(cè)流程

OK, 基本概念講清楚了, 下面來看下事件注冊(cè)的入口EventBus.getDefault().register(this). 一般通過EventBus.getDefault()獲取EventBus的實(shí)例, 來看下創(chuàng)建及初始化的過程.

private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public static EventBus getDefault() {
        //單例
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
    }
public EventBus() {
        this(DEFAULT_BUILDER);
    }
    //初始化了所有訂閱/發(fā)布過程中用到的參數(shù)
    EventBus(EventBusBuilder builder) {
        //鍵: 事件的class對(duì)象   值: 所有訂閱關(guān)系集合CopyOnWriteArrayList<Subscription>
        subscriptionsByEventType = new HashMap<>();
        //鍵: 訂閱者   值: 事件class對(duì)象的集合
        typesBySubscriber = new HashMap<>();
        //鍵:事件class對(duì)象   值: 粘性事件
        stickyEvents = new ConcurrentHashMap<>();
        //主線程處理器
        mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
        //后臺(tái)線程處理器
        backgroundPoster = new BackgroundPoster(this);
        //異步線程處理器
        asyncPoster = new AsyncPoster(this);
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        //查找緩存訂閱方法的工具類
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        logSubscriberExceptions = builder.logSubscriberExceptions;
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        throwSubscriberException = builder.throwSubscriberException;
        //是否繼承事件, true的話, 發(fā)送子事件時(shí), 也會(huì)發(fā)送父類事件.默認(rèn)為true
        eventInheritance = builder.eventInheritance;
        executorService = builder.executorService;
    }

下面來看注冊(cè)的方法:

public void register(Object subscriber) {
        //獲取到訂閱者對(duì)象
        Class<?> subscriberClass = subscriber.getClass();
        //查找訂閱者中所有的訂閱方法
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            //在同步代碼塊中對(duì)訂閱方法逐一進(jìn)行訂閱
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

SubscriberMethodFinder是查找和緩存訂閱方法的工具類, 看下findSubscriberMethods方法:

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        //METHOD_CACHE緩存訂閱者和訂閱者中所有訂閱方法的map
        //鍵: 訂閱者對(duì)象, 值: List<SubscriberMethod>訂閱方法的集合
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
         //直接使用緩存中的訂閱方法結(jié)合
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
        //是否忽略注解器EventBusAnnotationProcessor生成的MyEventBusIndex
        if (ignoreGeneratedIndex) {
            //通過反射查找訂閱者中的訂閱方法集合
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            //在MyEventBusIndex類中查找訂閱者的訂閱方法集合
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        //搜索完為空,拋異常, 不為空, 放入METHOD_CACHE緩存
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

注解器EventBusAnnotationProcessor是什么鬼, 還能自動(dòng)生成MyEventBusIndex來保存所有訂閱者與訂閱方法集合, 后面再詳細(xì)講解. 先看通過反射查找訂閱方法findUsingReflection.

private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
        //內(nèi)部類FindState, 用來保存和校驗(yàn)訂閱方法
        FindState findState = prepareFindState();
        //初始化FindState, 把訂閱者class對(duì)象傳進(jìn)去
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            //查找當(dāng)前訂閱者對(duì)象中的訂閱方法
            findUsingReflectionInSingleClass(findState);
            //查找父類中的訂閱方法, clazz變?yōu)楦割悓?duì)象,以此循環(huán)查找父類中的訂閱方法
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }
private FindState prepareFindState() {
        synchronized (FIND_STATE_POOL) {
            從FindState池中找到不為空的FindState,返回,達(dá)到復(fù)用的目的
            for (int i = 0; i < POOL_SIZE; i++) {
                FindState state = FIND_STATE_POOL[i];
                if (state != null) {
                    FIND_STATE_POOL[i] = null;
                    return state;
                }
            }
        }
       //池中都為空, new一個(gè)FindState
        return new FindState();
    }

再來看如何根據(jù)FindState查找所有訂閱方法findUsingReflectionInSingleClass.

private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // 拿到該訂閱者的所有方法
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            //方法的修飾符必須是public,不是abstract,不是static
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                //方法必須只有一個(gè)參數(shù)
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                     //方法被Subscribe注解
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        //檢查findstate是否添加了該方法,沒有返回true
                        if (findState.checkAdd(method, eventType)) {
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            //將訂閱方法的所有信息保存到findState.subscriberMethods
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }

循環(huán)保存訂閱者及父類中所有訂閱方法之后, 返回getMethodsAndRelease(findState)

private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
        //獲取到之前保存的findState.subscriberMethods,最終返回
        List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
       //清空findState的狀態(tài)
        findState.recycle();
        synchronized (FIND_STATE_POOL) {
            //把findState放到池中空閑的位置, 以復(fù)用
            for (int i = 0; i < POOL_SIZE; i++) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }
        return subscriberMethods;
    }

這是通過反射查找訂閱者中訂閱方法集合, 通過注解器EventBusAnnotationProcessor生成的MyEventBusIndex查找的方法findUsingInfo也放在后面介紹.
至此, 查找訂閱者及父類中所有訂閱方法結(jié)束了, 再看下逐一訂閱的方法subscribe.

 private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        //獲取到事件
        Class<?> eventType = subscriberMethod.eventType;
        //創(chuàng)建這種訂閱關(guān)系
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        //從緩存中根據(jù)事件class對(duì)象獲取訂閱關(guān)系集合
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            //緩存中沒有, new一個(gè)訂閱關(guān)系集合,保存
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            //緩存中訂閱關(guān)系集合,且已經(jīng)包含了這種訂閱關(guān)系,拋異常
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            //如果當(dāng)前訂閱關(guān)系優(yōu)先級(jí)大于集合中其他訂閱關(guān)系,插在前面
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
        //緩存中根據(jù)訂閱者查找事件對(duì)象的集合
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        //緩存沒有, new一個(gè)加入緩存
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);
        //如果是粘性事件,直接分發(fā), 因此先分發(fā)再訂閱也能處理事件
        if (subscriberMethod.sticky) {
            //繼承事件的話, 從緩存中獲取所有鍵是該事件對(duì)象或其父類的事件, 然后分發(fā)
            if (eventInheritance) {
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    //獲取鍵: 事件對(duì)象
                    Class<?> candidateEventType = entry.getKey();
                    //鍵是當(dāng)前訂閱方法的事件對(duì)象或其父類
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        //分發(fā)粘性事件
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                //只從緩存中獲取該事件對(duì)象對(duì)應(yīng)的事件
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

盜個(gè)圖總結(jié)一下整個(gè)注冊(cè)流程: 圖來源

register.png

注冊(cè)流程看完了, 在看一下取消注冊(cè)的代碼:

public synchronized void unregister(Object subscriber) {
        //根據(jù)訂閱者獲取所有事件對(duì)象
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                //刪除訂閱者中事件是eventType 的訂閱方法
                unsubscribeByEventType(subscriber, eventType);
            }
            //typesBySubscriber中刪除訂閱者相關(guān)信息
            typesBySubscriber.remove(subscriber);
        } else {
            Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        //根據(jù)事件對(duì)象獲取所有該事件的訂閱方法
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
           //遍歷所有訂閱方法
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                //是當(dāng)前要取消注冊(cè)的訂閱者, 就刪除該訂閱方法, 并把a(bǔ)ctive狀態(tài)置為false
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }

事件分發(fā)流程

事件分普通事件和粘性事件, 分發(fā)也分兩種:
普通事件EventBus.getDefault().post(event);
粘性事件:EventBus.getDefault().postSticky(event);

public void postSticky(Object event) {
        synchronized (stickyEvents) {
            stickyEvents.put(event.getClass(), event);
        }
        post(event);
    }

由此可以看出粘性事件就是在分發(fā)之前保存到stickyEvents中了, 其他與普通事件相同, 看普通事件如何分發(fā);

public void post(Object event) {
        //當(dāng)前分發(fā)線程的狀態(tài)
        PostingThreadState postingState = currentPostingThreadState.get();
        //當(dāng)前分發(fā)線程的事件隊(duì)列
        List<Object> eventQueue = postingState.eventQueue;
        //把當(dāng)前事件加入隊(duì)列
        eventQueue.add(event);

        if (!postingState.isPosting) {
            //是否在主線程分發(fā)
            postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                //循環(huán)從隊(duì)列中移除一個(gè)事件并分發(fā)
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

currentPostingThreadState是一個(gè)包含PostingThreadState的ThreadLocal對(duì)象.
PostingThreadState記錄了當(dāng)前分發(fā)線程的具體信息,包括事件隊(duì)列,分發(fā)狀態(tài), 正在分發(fā)的事件及訂閱關(guān)系等.
ThreadLocal是一個(gè)線程內(nèi)部的數(shù)據(jù)存儲(chǔ)類, 通過它可以在不同的線程之中互不干擾地存儲(chǔ)并提供數(shù)據(jù), 具體介紹戳這里.
下面看如何分發(fā)隊(duì)列中的事件postSingleEvent

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            //繼承事件的話, 要分發(fā)當(dāng)前事件及其所有父類及接口
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                //只要有一個(gè)分發(fā)成功就返回true
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            //不繼承的話, 只分發(fā)當(dāng)前事件
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        //如果沒有訂閱該事件
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                Log.d(TAG, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                //發(fā)送一個(gè)NoSubscriberEvent
                post(new NoSubscriberEvent(this, event));
            }
        }
    }
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
        synchronized (eventTypesCache) {
            //鍵: 事件class對(duì)象   值: 事件及其父類或父接口的class對(duì)象集合
            List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
            if (eventTypes == null) {
                eventTypes = new ArrayList<>();
                Class<?> clazz = eventClass;
                while (clazz != null) {
                    eventTypes.add(clazz);//加入當(dāng)前事件class對(duì)象
                    //如果是接口, 加入所有父接口對(duì)象
                    addInterfaces(eventTypes, clazz.getInterfaces());
                    clazz = clazz.getSuperclass();//循環(huán)遍歷父類
                }
                eventTypesCache.put(eventClass, eventTypes);
            }
            return eventTypes;
        }
    }

postSingleEvent判斷了一下是否要繼承事件, 繼承的話再對(duì)每一個(gè)事件(包括父類及父接口)分發(fā), 不繼承的話只分發(fā)當(dāng)前事件, 真正分發(fā)事件是在postSingleEventForEventType.

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            //根據(jù)事件對(duì)象,獲取訂閱關(guān)系的集合
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        //有訂閱者訂閱了該事件, 就一一處理, 最后返回true
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                //把事件及訂閱關(guān)系保存到分發(fā)線程中
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    //處理事件
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    //分發(fā)完清空分發(fā)線程狀態(tài)
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        //沒有訂閱者訂閱該事件,返回false
        return false;
    }

private final HandlerPoster mainThreadPoster;
private final BackgroundPoster backgroundPoster;
private final AsyncPoster asyncPoster;
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                //不管當(dāng)前在哪個(gè)線程, 直接處理事件
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                //如果當(dāng)前在主線程, 則直接處理事件, 不在主線程則轉(zhuǎn)到mainThreadPoster中處理事件
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BACKGROUND:
                //如果當(dāng)前在主線程, 則轉(zhuǎn)到backgroundPoster中處理, 如果不在主線程中,就直接處理
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                //直接轉(zhuǎn)到asyncPoster中處理
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

invokeSubscriber就是利用反射調(diào)用訂閱者的訂閱方法.

void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

盜個(gè)圖來總結(jié)一下整個(gè)Post流程就是:

post.png

mainThreadPoster, backgroundPoster, asyncPoster是如何實(shí)現(xiàn)線程切換的呢?

線程切換

看三大Poster之前先看兩個(gè)類: PendingPost和PendingPostQueue.
PendingPost是一個(gè)待發(fā)送事件, 里面保存了將要發(fā)送的事件及訂閱方法, 并且維護(hù)了一個(gè)靜態(tài)待發(fā)送事件池,達(dá)到復(fù)用的目的.

final class PendingPost {
    //靜態(tài)待發(fā)送事件池
    private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
    Object event;//將要發(fā)送的事件
    Subscription subscription;//將要發(fā)送的訂閱關(guān)系
    PendingPost next;//下一個(gè)待發(fā)送事件

    private PendingPost(Object event, Subscription subscription) {
        this.event = event;
        this.subscription = subscription;
    }
    //從池中獲取一個(gè)待發(fā)送事件
    static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                //從末尾獲取待發(fā)送事件, 保存?zhèn)鬟f進(jìn)來的事件和訂閱關(guān)系,返回
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        //如果池中數(shù)量為空, new一個(gè)PendingPost返回
        return new PendingPost(event, subscription);
    }
    //釋放當(dāng)前pendingPost
    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        //先把pendingPost中數(shù)據(jù)置空, 在保存在池中, 以復(fù)用
        synchronized (pendingPostPool) {
            //保證池中數(shù)量不超過10000
            if (pendingPostPool.size() < 10000) {
                pendingPostPool.add(pendingPost);
            }
        }
    }
}

PendingPostQueue相當(dāng)于一個(gè)待發(fā)送事件隊(duì)列, 維護(hù)了一個(gè)頭和一個(gè)尾.

final class PendingPostQueue {
    private PendingPost head;
    private PendingPost tail;
    //把傳進(jìn)來的pendingPost放到隊(duì)列尾部
    synchronized void enqueue(PendingPost pendingPost) {
        if (pendingPost == null) {
            throw new NullPointerException("null cannot be enqueued");
        }
        if (tail != null) {
            tail.next = pendingPost;
            tail = pendingPost;
        } else if (head == null) {
            head = tail = pendingPost;
        } else {
            throw new IllegalStateException("Head present, but no tail");
        }
        notifyAll();
    }
     //取出隊(duì)列的頭部
    synchronized PendingPost poll() {
        PendingPost pendingPost = head;
        if (head != null) {
            head = head.next;
            if (head == null) {
                tail = null;
            }
        }
        return pendingPost;
    }
    //隊(duì)列為空的話,等maxMillisToWait毫秒再取出頭部
    synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
        if (head == null) {
            wait(maxMillisToWait);
        }
        return poll();
    }
}

OK, 理解了這兩個(gè)類再來看三大Poster, 就好理解了.

  1. HandlerPoster主線程處理器
final class HandlerPoster extends Handler {

    private final PendingPostQueue queue;//待發(fā)送事件隊(duì)列
    private final int maxMillisInsideHandleMessage;//處理消息超時(shí)時(shí)間
    private final EventBus eventBus;
    private boolean handlerActive;//是否正在處理消息

    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();//初始化隊(duì)列
    }

    void enqueue(Subscription subscription, Object event) {
        //從靜態(tài)池中取出一個(gè)PendingPost, 保存當(dāng)前事件和訂閱關(guān)系
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
             //加入隊(duì)列
            queue.enqueue(pendingPost);
            //如果當(dāng)前沒有正在處理消息, 則發(fā)送一個(gè)空消息, 觸發(fā)handleMessage去處理消息
            if (!handlerActive) {
                handlerActive = true;
                //如果發(fā)送失敗, 拋異常
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;//是否重新處理
        try {
            long started = SystemClock.uptimeMillis();
            //循環(huán)從隊(duì)列中取出一個(gè)待發(fā)送事件處理
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        pendingPost = queue.poll();
                        //確保隊(duì)列中沒有待發(fā)送事件之后, handlerActive置為false, 退出循環(huán)
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //交給eventBus處理待發(fā)送的事件
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                //如果消息處理超時(shí), 發(fā)送一個(gè)空消息, 重新出發(fā)消息處理
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

可以看到EventBus處理事件的時(shí)候, 通過mainThreadPoster.enqueue(subscription, event)把事件加入了mainThreadPoster的隊(duì)列中, Handler的消息處理機(jī)制不斷循環(huán)從隊(duì)列中取出待發(fā)送事件, 通過eventBus.invokeSubscriber(pendingPost)又傳給EventBus處理, 這就實(shí)現(xiàn)了線程的切換, 因?yàn)镋ventBus初始化mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10)時(shí)指定了Looper.getMainLooper(), 也就指定了mainThreadPoster 是主線程中的Handler.
然后來看下EventBus的invokeSubscriber(pendingPost)

void invokeSubscriber(PendingPost pendingPost) {
        //取出待發(fā)送事件中的事件及訂閱關(guān)系
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        //把pendingPost置空,并加入到靜態(tài)待發(fā)送事件池中
        PendingPost.releasePendingPost(pendingPost);
        if (subscription.active) {
            //通過反射調(diào)用訂閱者的訂閱方法
            invokeSubscriber(subscription, event);
        }
    }
  1. BackgroundPoster后臺(tái)線程處理器
final class BackgroundPoster implements Runnable {

    private final PendingPostQueue queue;//待發(fā)送事件隊(duì)列
    private final EventBus eventBus;
    private volatile boolean executorRunning;//是否正在處理

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();//初始化隊(duì)列
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            //當(dāng)前線程沒有運(yùn)行的話, 從線程池中new一個(gè)線程或取一個(gè)空閑線程來執(zhí)行run()方法
            if (!executorRunning) {
                executorRunning = true;
                //eventBus.getExecutorService()默認(rèn)為Executors.newCachedThreadPool();
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                //循環(huán)從隊(duì)列中取出待發(fā)送事件,交給EventBus處理
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            //確保隊(duì)列中沒有事件, executorRunning置為false, 退出循環(huán)
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    //在當(dāng)前子線程中把待發(fā)送事件交給EventBus處理
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

BackgroundPoster 原理類似HandlerPoster, 把事件加入BackgroundPoster 的隊(duì)列中, 然后啟用了線程池中的線程來取出待發(fā)送事件, 并交給EventBus, 通過反射來處理事件.

  1. AsyncPoster 異步線程處理器
class AsyncPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }
}

是不是跟BackgroundPoster 幾乎一樣, 不同點(diǎn)就在于每次enqueue加入隊(duì)列之后都調(diào)用了eventBus.getExecutorService()線程池中的一個(gè)新線程或空閑線程來執(zhí)行操作( 取出一個(gè)待發(fā)送事件, 交給EventBus處理), 及每個(gè)事件都是在獨(dú)立的線程中處理的. 而BackgroundPoster加入隊(duì)列之后, 只有在未運(yùn)行的時(shí)候才會(huì)從線程池中取一個(gè)新線程或空閑線程來執(zhí)行操作(循環(huán)從隊(duì)列中取出待發(fā)送事件并處理), 如果正在運(yùn)行, 就在當(dāng)前線程中執(zhí)行操作.

EventBusAnnotationProcessor

EventBus源碼中的EventBusAnnotationProcessor, 繼承了AbstractProcessor, 是的, 就是應(yīng)用APT技術(shù), 在編譯時(shí)掃描所有@Subscribe注解, 自動(dòng)生成MyEventBusIndex來保存所有訂閱者中的訂閱信息, 當(dāng)然MyEventBusIndex名稱可以自己配置, 具體配置方法可參考這篇文章.
首先看下生成的MyEventBusIndex的結(jié)構(gòu):

public class MyEventBusIndex implements SubscriberInfoIndex {
    //鍵: 訂閱者對(duì)象  值: 訂閱者的訂閱信息
    private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;

    static {
        SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();
        //掃描所有訂閱者中被@Subscribe注解的所有方法, 以SimpleSubscriberInfo對(duì)象的形式保存在SUBSCRIBER_INDEX中
        putIndex(new SimpleSubscriberInfo(TestRunnerActivity.class, true, new SubscriberMethodInfo[] {
            new SubscriberMethodInfo("onEventMainThread", TestFinishedEvent.class, ThreadMode.MAIN),
        }));

        putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscribeClassEventBusMain.class,
                true, new SubscriberMethodInfo[] {
            new SubscriberMethodInfo("onEventMainThread", TestEvent.class, ThreadMode.MAIN),
        }));

        putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.SubscribeClassEventBusDefault.class,
                true, new SubscriberMethodInfo[] {
            new SubscriberMethodInfo("onEvent", TestEvent.class),
        }));

        putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscriberClassEventBusAsync.class,
                true, new SubscriberMethodInfo[] {
            new SubscriberMethodInfo("onEventAsync", TestEvent.class, ThreadMode.ASYNC),
        }));

        putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscribeClassEventBusBackground.class,
                true, new SubscriberMethodInfo[] {
            new SubscriberMethodInfo("onEventBackgroundThread", TestEvent.class, ThreadMode.BACKGROUND),
        }));

    }

    private static void putIndex(SubscriberInfo info) {
        SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
    }
    //通過訂閱者對(duì)象, 取出所有訂閱信息
    @Override
    public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
        SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
        if (info != null) {
            return info;
        } else {
            return null;
        }
    }
}

如何利用自動(dòng)生成的MyEventBusIndex呢?

EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();
public EventBusBuilder addIndex(SubscriberInfoIndex index) {
        if(subscriberInfoIndexes == null) {
            subscriberInfoIndexes = new ArrayList<>();
        }
        //把傳進(jìn)來的MyEventBusIndex保存到subscriberInfoIndexes中
        subscriberInfoIndexes.add(index);
        return this;
    }
public EventBus installDefaultEventBus() {
        synchronized (EventBus.class) {
            if (EventBus.defaultInstance != null) {
                throw new EventBusException("Default instance already exists." +
                        " It may be only set once before it's used the first time to ensure consistent behavior.");
            }
            EventBus.defaultInstance = build();
            return EventBus.defaultInstance;
        }
    }
public EventBus build() {
        return new EventBus(this);
    }

MyEventBusIndex繼承了SubscriberInfoIndex, 把MyEventBusIndex加入到EventBusBuilder中, 再通過installDefaultEventBus生成EventBus實(shí)例, 傳到EventBus靜態(tài)實(shí)例defaultInstance中.到此就把MyEventBusIndex中的訂閱信息導(dǎo)入EventBus實(shí)例了.
還記得EventBus構(gòu)造方法中訂閱方法掃描器subscriberMethodFinder 的初始化嗎, 就是把EventBusBuilder的subscriberInfoIndexes信息傳給了掃描器.

subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);

然后來看通過MyEventBusIndex查找訂閱者的訂閱方法findUsingInfo.

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
       FindState findState = prepareFindState();
       findState.initForSubscriber(subscriberClass);
       while (findState.clazz != null) {
           //查找訂閱信息
           findState.subscriberInfo = getSubscriberInfo(findState);
           if (findState.subscriberInfo != null) {
               //獲取到該訂閱者的所有訂閱方法
               SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
               for (SubscriberMethod subscriberMethod : array) {
                   //校驗(yàn)是否有重復(fù)的事件相同的訂閱方法, 沒有返回true
                   if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                       //沒有重復(fù), 把這個(gè)訂閱方法存到findState.subscriberMethods中
                       findState.subscriberMethods.add(subscriberMethod);
                   }
               }
           } else {
               //沒有找到在通過發(fā)射查找
               findUsingReflectionInSingleClass(findState);
           }
           findState.moveToSuperclass();
       }
       return getMethodsAndRelease(findState);
   }
private SubscriberInfo getSubscriberInfo(FindState findState) {
       //如果已經(jīng)找到訂閱者相關(guān)信息
       if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
           SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
           if (findState.clazz == superclassInfo.getSubscriberClass()) {
               return superclassInfo;
           }
       }
       //沒有訂閱者相關(guān)信息, 遍歷所有通過EventBusAnnotationProcessor生成的SubscriberInfoIndex 類, 查找對(duì)應(yīng)訂閱者的訂閱信息
       if (subscriberInfoIndexes != null) {
           for (SubscriberInfoIndex index : subscriberInfoIndexes) {
               SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
               if (info != null) {
                   return info;
               }
           }
       }
       return null;
   }

關(guān)于EventBusAnnotationProcessor是如何在編譯期間掃描注解生成MyEventBusIndex的, 我也沒看太懂(尷尬...), 感興趣的可以自己去看源碼.
最后感謝:
http://m.itdecent.cn/p/f057c460c77e
http://blog.csdn.net/lsyz0021/article/details/51985307

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 簡(jiǎn)介 我們知道,Android應(yīng)用主要是由4大組件構(gòu)成。當(dāng)我們進(jìn)行組件間通訊時(shí),由于位于不同的組件,通信方式相對(duì)麻...
    Whyn閱讀 598評(píng)論 0 1
  • 看了好幾次EventBus源碼了,這次總算基本上搞清楚了,簡(jiǎn)單來說就是調(diào)用 EventBus.getDefault...
    一只好奇的茂閱讀 424評(píng)論 0 20
  • 相關(guān)文章Android事件總線(一)EventBus3.0用法全解析 前言 上一篇我們講到了EventBus3.0...
    劉望舒閱讀 2,761評(píng)論 4 24
  • 主目錄見:Android高級(jí)進(jìn)階知識(shí)(這是總目錄索引)?因?yàn)閲?guó)慶放假的緣故,好幾天沒有寫文章,今天抽空來寫一篇,那...
    ZJ_Rocky閱讀 1,894評(píng)論 6 7
  • EventBus用法及源碼解析目錄介紹1.EventBus簡(jiǎn)介1.1 EventBus的三要素1.2 EventB...
    楊充211閱讀 2,054評(píng)論 0 4

友情鏈接更多精彩內(nèi)容