EventBus3.0 源碼解析

簡(jiǎn)介

我們知道,Android應(yīng)用主要是由4大組件構(gòu)成。當(dāng)我們進(jìn)行組件間通訊時(shí),由于位于不同的組件,通信方式相對(duì)麻煩?;诖?,EventBus便油然而生。通過EventBus,我們可以很輕松的進(jìn)行組件間通信。

使用方法

EventBus in 3 steps

Define events:

public static class MessageEvent { /* Additional fields if needed */ }

Prepare subscribers: Declare and annotate your subscribing method, optionally specify a thread mode:

@Subscribe(threadMode = ThreadMode.MAIN)
 public void onMessageEvent(MessageEvent event) {/* Do something */};

Register and unregister your subscriber. For example on Android, activities and fragments should usually register according to their life cycle:

@Override 
public void onStart() { 
    super.onStart(); 
    EventBus.getDefault().register(this); 
} 
 @Override
 public void onStop() { 
     super.onStop();
     EventBus.getDefault().unregister(this); 
}

Post events:

EventBus.getDefault().post(new MessageEvent());

Read the full getting started guide.

源碼解析

EventBus的使用,無外乎主要就是訂閱,事件發(fā)送和解除訂閱3大步驟,那么源碼解析我們也從這3個(gè)方面進(jìn)行入手。

(1). 訂閱

使用EventBus時(shí),第一步肯定是先進(jìn)行注冊(cè),通常使用的就是:

EventBus.getDefault().register(this);

那我們就先來看下:EventBus.getDefault()

public EventBus() {
        this(DEFAULT_BUILDER);
    }

 /** Convenience singleton for apps using a process-wide EventBus instance. */
    public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
    }

可以看到,getDefault()其實(shí)就是用的雙重鎖校驗(yàn)來實(shí)現(xiàn)一個(gè)單例EventBus,但是跟普通的單例模式不同的是,EventBus的默認(rèn)構(gòu)造函數(shù)確是public的,其實(shí)這里的public是有深層含義的,當(dāng)我們使用getDefault()時(shí)獲取的EventBus,只是一條事件總線,如果我們想構(gòu)造多條事件總線,那么我們就可以直接實(shí)例化EventBus即可。
接下來我們來看下EventBus的注冊(cè)過程:

/**
     * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
     * are no longer interested in receiving events.
     * <p/>
     * Subscribers have event handling methods that must be annotated by {@link Subscribe}.
     * The {@link Subscribe} annotation also allows configuration like {@link
     * ThreadMode} and priority.
     */
    public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        //找到subscriber源文件(及其父類)中注冊(cè)的事件方法
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                //按事件將所有的subscirber記錄下來,并依據(jù)priority進(jìn)行排序,
                // 如果是sticky事件,則從粘性記錄中查找是否前面已經(jīng)分發(fā)了該事件通知,
                // 如果是,則立即調(diào)用該subscriber的subscriberMethod。
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

從register源代碼可以看到,注冊(cè)過程主要分為2個(gè)步驟:
1.通過findSubscriberMethods()方法,就可以找到對(duì)應(yīng)subscriber中注冊(cè)的事件方法;
2.subscribe()過程,記錄subscriber并排序,對(duì)注冊(cè)者進(jìn)行粘性事件調(diào)用···

那首先,我們先分析下findSubscriberMethods(),其具體的查找過程如下:

 List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
 //是否忽略從注解器生成的MyEventBusIndex類獲取事件方法,默認(rèn)為false
        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            //將subscriberClass類(連同父類)對(duì)應(yīng)的@Subscribe方法進(jìn)行緩存
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

METHOD_CACHE是一個(gè)全局靜態(tài)的ConcurrentHashMap,

private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();

從METHOD_CACHE中我們可以得知,它會(huì)緩存所有事件總線的所有subscriber對(duì)應(yīng)的所有事件方法。
EventBus3.0相對(duì)于之前的版本,性能上有了很大的提升,提升的主要原因就在于事件信息的獲取來源:舊版本事件信息都是通過采用反射來獲取,而新版本默認(rèn)是采用編譯器注解方式(如果還不清楚注解處理器的內(nèi)容,可以參考下我之前的簡(jiǎn)文:注解處理器(Annotation Processor)簡(jiǎn)析),在編譯的時(shí)候通過@Subscribe注解獲取事件信息,從而在效率上得到很大的提升。
所以,register中,findUsingReflection(subscriberClass)采用的便是運(yùn)行時(shí)反射獲取,findUsingInfo(subscriberClass)采用的就是從apt自動(dòng)生成的MyEventBusIndex類中獲取事件信息。

這里,我們就只對(duì)findUsingInfo()進(jìn)行分析:

 /**
     *
     * @param subscriberClass
     * @return 獲取subscriberClass及其父類的@Subscribe方法
     */
    private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
        //從apt自動(dòng)生成的MyEventBusIndex類中獲取SubscriberInfo
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                findUsingReflectionInSingleClass(findState);
            }
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }

這里FindState的設(shè)計(jì)采用享元模式,避免FindState的頻繁創(chuàng)建,具體詳情可以查看附錄介紹。

void initForSubscriber(Class<?> subscriberClass) {
            this.subscriberClass = clazz = subscriberClass;
            skipSuperClasses = false;
            subscriberInfo = null;
        }

findState.initForSubscriber(subscriberClass)中,我們可以得知findState.subscriberClass = findState.clazz = subscriberClass != null,所以findUsingInfo會(huì)進(jìn)入while循環(huán)。進(jìn)入循環(huán)后,這里有一個(gè)很重要的動(dòng)作就是:getSubscriberInfo

 private SubscriberInfo getSubscriberInfo(FindState findState) {
                 ···
                 ···
        //由EventBusBuillder從apt自動(dòng)生成的類中獲取
        if (subscriberInfoIndexes != null) {
            for (SubscriberInfoIndex index : subscriberInfoIndexes) {
                //取得Subscriber類的相關(guān)信息:Subscriber類,@Subscribe相關(guān)方法等
                SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
                if (info != null) {
                    return info;
                }
            }
        }
        return null;
    }
//SubscriberMethodFinder
private List<SubscriberInfoIndex> subscriberInfoIndexes;

如果subscriberInfoIndexes != null,那么就可以從其得到SubscriberInfo??吹竭@里,我們就有疑問了,你說subscriberInfoIndexes != null,那么它是在哪被賦值的呀?這里,我們就來回顧一下了,因?yàn)閟ubscriberInfoIndexes是SubscriberMethodFinder類的成員變量,所以很自然我們會(huì)找一些SubscriberMethodFinder在哪里會(huì)對(duì)subscriberInfoIndexes進(jìn)行賦值,

SubscriberMethodFinder(List<SubscriberInfoIndex> subscriberInfoIndexes, boolean strictMethodVerification,
                           boolean ignoreGeneratedIndex) {
        this.subscriberInfoIndexes = subscriberInfoIndexes;
        this.strictMethodVerification = strictMethodVerification;
        this.ignoreGeneratedIndex = ignoreGeneratedIndex;
    }

查看后,我們知道SubscriberMethodFinder是在構(gòu)造函數(shù)中對(duì)subscriberInfoIndexes進(jìn)行賦值,那么我們接下來應(yīng)該尋找的就是SubscriberMethodFinder是在哪里被實(shí)例化,還記得我們EventBus.register里面有一句:

subscriberMethodFinder.findSubscriberMethods(subscriberClass);

原來SubscriberMethodFinder的一個(gè)實(shí)例化就在EventBus中,那么我們就來查看下EventBus具體在哪個(gè)地方對(duì)其進(jìn)行創(chuàng)建:

EventBus(EventBusBuilder builder) {
        subscriptionsByEventType = new HashMap<>();
        typesBySubscriber = new HashMap<>();
        stickyEvents = new ConcurrentHashMap<>();
        mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
        backgroundPoster = new BackgroundPoster(this);
        asyncPoster = new AsyncPoster(this);
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        //index speed
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        logSubscriberExceptions = builder.logSubscriberExceptions;
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        throwSubscriberException = builder.throwSubscriberException;
        eventInheritance = builder.eventInheritance;
        executorService = builder.executorService;
    }

可以看到,SubscriberMethodFinder是在EventBus構(gòu)造函數(shù)中被實(shí)例化的,然后SubscriberMethodFinder.subscriberInfoIndexes指向的是builder.subscriberInfoIndexes,而EventBusBuilder是通過addIndex()對(duì)subscriberInfoIndexes進(jìn)行創(chuàng)建賦值的。


    /** Adds an index generated by EventBus' annotation preprocessor. */
    public EventBusBuilder addIndex(SubscriberInfoIndex index) {
        if(subscriberInfoIndexes == null) {
            subscriberInfoIndexes = new ArrayList<>();
        }
        subscriberInfoIndexes.add(index);
        return this;
    }

所以,如果想讓EventBus從apt中進(jìn)行數(shù)據(jù)獲取,還要通過顯示構(gòu)建一個(gè)EventBusBuilder,并調(diào)用
EventBus.builder().addIndex(new MyEventBusIndex()).build()方法進(jìn)行創(chuàng)建賦值。
具體的做法如下:參考官方文檔

現(xiàn)在讓我們回到findUsingInfo方法中,如果getSubscriberInfo == null,那么它也會(huì)通過反射進(jìn)行事件信息獲取。也就是說,如果你本意想通過apt進(jìn)行數(shù)據(jù)獲取,但是可能由于不小心缺少了相應(yīng)步驟配置,導(dǎo)致無法從apt獲取,那么,程序會(huì)自動(dòng)采用反射進(jìn)行獲取。結(jié)果是可以成功運(yùn)行的,只是效率低了許多。

現(xiàn)在假設(shè)我們成功配置了apt,那么findUsingInfo就會(huì)進(jìn)入下面代碼:

 while (findState.clazz != null) {
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            }
···
 findState.moveToSuperclass();
}

這部分代碼最終就會(huì)獲取到subscriberClass及其父類的@Subscribe方法存放到 findState.subscriberMethods中,最終返回給上層調(diào)用。
以上,查找事件信息過程就已結(jié)束了。

接下來,我們來看下subscribe過程:

/**
     *
     * @param subscriber
     * @param subscriberMethod
     * Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
     * subscriptionsByEventType中存儲(chǔ)的就是事件對(duì)應(yīng)的監(jiān)聽組信息。
     * 由事件獲取得到監(jiān)聽組,然后新的subscriber和SubscriberMethod按priority存放進(jìn)同一event的
     * CopyOnWriteArrayList<Subscription>中.
     *
     * 如果當(dāng)前的Subscriber事件方法是sticky的,則從stickyEvents記錄中查找看是否有與本方法事件
     * 相同的事件(即之前已經(jīng)有組件發(fā)送該粘性事件{@link #postSticky(Object)},如果有,則立即
     * 調(diào)用本方法。
     */
    // Must be called in synchronized block
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }

        //記錄subscriber對(duì)應(yīng)的所有事件類型
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);

        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    //candidateEventType跟eventType是否是同一類型
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

可以看出,subscribe過程主要做了3件事:
1.將當(dāng)前的subscriberMethod按priority添加到subscriptionsByEventType中的同一evenType中
2.將當(dāng)前事件eventType添加到當(dāng)前subscriber的事件集合中,映射關(guān)系表由typesBySubscriber記錄
3.如果當(dāng)前事件是粘性事件,那么從系統(tǒng)保存的粘性發(fā)送事件stickyEvents中,找尋與當(dāng)前事件相同(或與當(dāng)前事件類型有繼承關(guān)系)的事件,取出事件參數(shù),并立即調(diào)用當(dāng)前事件方法,傳入事件參數(shù)。

最后,以一張圖來顯示register過程:


EventBus::register

(2). 事件分發(fā)

接下來,我們來看一下EventBus的事件分發(fā)過程:

 /** Posts the given event to the event bus. */
    public void post(Object event) {
        //取得post線程當(dāng)前狀態(tài)
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        //為當(dāng)前線程增加一個(gè)分發(fā)事件
        eventQueue.add(event);

        //當(dāng)前線程未處于分發(fā)狀態(tài)
        if (!postingState.isPosting) {
            postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                //分發(fā)當(dāng)前線程所有事件
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

可以看到,post過程首先做的是取出當(dāng)前線程的線程狀態(tài),currentPostingThreadState是一個(gè)ThreadLocal對(duì)象,這樣可以很好的保證在不同的線程中都擁有一份線程獨(dú)立的PostingThreadState對(duì)象。
post過程首先取出當(dāng)前線程狀態(tài)對(duì)象:PostingThreadState postingState ,然后將當(dāng)前分發(fā)的事件添加進(jìn)線程對(duì)象postingState中,最后,如果當(dāng)前線程不處于分發(fā)狀態(tài),那么就循環(huán)遍歷當(dāng)前線程所有分發(fā)事件,取出事件進(jìn)行分發(fā)。分發(fā)函數(shù)為:postSingleEvent

 private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        //eventInheritance:是否支持事件繼承分發(fā),默認(rèn)為true
        if (eventInheritance) {
            //存儲(chǔ)eventClass及其父類/接口,
            // 即發(fā)送一個(gè)事件,那么注冊(cè)該事件以及該事件的父類/接口的訂閱者都會(huì)收到該事件通知
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        //如果分發(fā)的事件沒有對(duì)應(yīng)的訂閱者
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                Log.d(TAG, "No subscribers registered for event " + eventClass);
            }
            //則發(fā)送一個(gè)NoSubscriberEvent事件
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

postSingleEvent為單個(gè)事件分發(fā)流程,eventInheritance標(biāo)志事件是否支持繼承分發(fā),默認(rèn)為true(可以在EventBusBuilder中查看到),即EventBus默認(rèn)會(huì)分發(fā)該事件到注冊(cè)該事件或者該事件父類/接口的訂閱者事件函數(shù)中。所以,postSingleEvent主要做了2件事:
1.判斷是否支持事件繼承分發(fā):如果支持,那么就通過當(dāng)前事件class對(duì)象,找到其父類/接口所有事件類型,然后循環(huán)取出每個(gè)事件類別,依次調(diào)用postSingleEventForEventType進(jìn)行實(shí)際的事件分發(fā);如果不支持事件繼承分發(fā),那么就直接將當(dāng)前事件分發(fā)給當(dāng)前事件訂閱者。
2.如果當(dāng)前分發(fā)事件沒有對(duì)應(yīng)的訂閱者,那么就會(huì)發(fā)送一個(gè)NoSubscriberEvent事件給到當(dāng)前事件總線。

而將事件分發(fā)給訂閱者,主要做了哪些事呢?讓我們看一下源碼:

 private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

原來,事件分發(fā)給訂閱者主要做的就是:從之前register的訂閱者事件集合subscriptionsByEventType中,通過當(dāng)前事件,得到對(duì)應(yīng)的訂閱者集合,結(jié)合當(dāng)前的post線程對(duì)象狀態(tài),依次調(diào)用postToSubscription真正地將事件分發(fā)給各個(gè)訂閱者。

我們知道,在使用EventBus過程中,我們可以在不同的線程post事件,然后訂閱該事件的事件函數(shù)會(huì)根據(jù)threadMode會(huì)自動(dòng)進(jìn)行線程切換,那這是怎樣做到的呢?想了解清楚這點(diǎn),那么我們就要看一下postToSubscription函數(shù)了:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

這里,我們可以看到,postToSubscription會(huì)根據(jù)每個(gè)訂閱者的事件函數(shù)的threadMode標(biāo)識(shí),對(duì)應(yīng)當(dāng)前線程狀態(tài),進(jìn)行線程間轉(zhuǎn)換,從而保證了訂閱者事件函數(shù)會(huì)運(yùn)行在與threadMode指定的線程中。
從源碼中,我們可以看到,具體的threadMode所對(duì)應(yīng)的線程為:

  • POSTING:訂閱者事件函數(shù)運(yùn)行在與post線程同一線程。
  • MAIN:如果post線程是主線程,那么直接調(diào)用訂閱者事件函數(shù);如果post不是在主線程,那么通過mainThreadPoster將訂閱者函數(shù)切換到主線程上運(yùn)行。
  • BACKGROUND:如果調(diào)用者是主線程,那么通過backgroundPoster將訂閱者事件函數(shù)切換到后臺(tái)線程上運(yùn)行,反之則在post線程上直接運(yùn)行。
  • ASYNC:無論post處于哪個(gè)線程,都會(huì)直接重開一條線程執(zhí)行訂閱者函數(shù)。

我們看到,post事件后,如果是處于同一條執(zhí)行線程,EventBus是通過invokeSubscriber()函數(shù)讓訂閱者函數(shù)得到回調(diào),具體做法如下:

 void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

其實(shí)就是通過反射進(jìn)行方法調(diào)用。

然后如果是要進(jìn)行線程切換回調(diào)訂閱者事件函數(shù),則是采用各種Poster的enqueue方法,那么,我們接下來就來看下它們是具體怎樣進(jìn)行線程切換的。

  • ** 切換到主線程:mainThreadPoster.enqueue(subscription, event)**

首先,mainThreadPoster是一個(gè)HandlerPoster對(duì)象:

final class HandlerPoster extends Handler {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

在回顧下EventBus的構(gòu)造函數(shù),可以看到:

 EventBus(EventBusBuilder builder) {
       ···
      ···
       mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
      ···
      ···

可以看到,mainThreadPoster就是一個(gè)主線程的Handler,然后再enqueue的時(shí)候,先用一個(gè)PendingPost保存一下當(dāng)前的訂閱者信息和事件,并存儲(chǔ)到一個(gè)queue中,然后通過sendMessage()給自己發(fā)了一個(gè)消息,那么自然handleMessage()就會(huì)得到回掉,然后在HandlerPoster的handleMessage中可以看到,它通過在queue中獲取先前存入的事件信息后,調(diào)用了eventBus.invokeSubscriber(pendingPost):

void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        PendingPost.releasePendingPost(pendingPost);
        if (subscription.active) {
            invokeSubscriber(subscription, event);
        }
    }

所以invokeSubscriber最終也是采用發(fā)射的方式調(diào)用到了訂閱者的事件函數(shù),而且由于是在主線程Handler的handlerMessage中調(diào)用,那么訂閱者事件函數(shù)肯定是運(yùn)行在主線程中的。

  • ** 切換到后臺(tái)線程: backgroundPoster.enqueue(subscription, event)**
    backgroundPoster是一個(gè)BackgroundPoster對(duì)象:
final class BackgroundPoster implements Runnable {

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {

        while (true) {
            PendingPost pendingPost = queue.poll(1000);
                    ···
            eventBus.invokeSubscriber(pendingPost);
                    ···
            executorRunning = false;
        }
         
    }

以上代碼經(jīng)過簡(jiǎn)化,可以看到BackgroundPoster就是一個(gè)Runnable,然后enqueue的時(shí)候,也是把訂閱者相關(guān)信息和事件存儲(chǔ)到一個(gè)PendingPost 中,最后通過線程池的方式執(zhí)行自身,那么自己的run()方法就會(huì)得到調(diào)用,而run()方法里面做的就是從queue中獲取數(shù)據(jù),最后調(diào)用方式方式回調(diào)訂閱者事件函數(shù)。這里有一點(diǎn)要注意的就是,BackgroundPoster在線程池執(zhí)行一個(gè)任務(wù)時(shí),executorRunning是會(huì)被置成true的,完成一個(gè)任務(wù)后,才會(huì)被重置成false,也就是說,BackgroundPoster的任務(wù)是串行運(yùn)行的。

  • ** 新開線程:asyncPoster.enqueue(subscription, event)**
    asyncPoster是一個(gè)AsyncPoster對(duì)象:
/**
 * Posts events in background.
 * 
 * @author Markus
 */
class AsyncPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }
}

從AsyncPoster源碼中可以看到,其實(shí)現(xiàn)機(jī)制與BackgroundPoster是基本一致的。唯一值得注意的是,AsyncPoster來一個(gè)任務(wù)就立即執(zhí)行,也就是說AsyncPoster的任務(wù)是并行運(yùn)行的。

綜上:對(duì)于EventBus的線程切換邏輯,他們的做法其實(shí)原理基本一致:
都是通過將訂閱者相關(guān)信息和事件存儲(chǔ)到一個(gè)隊(duì)列里面,然后再異步從相關(guān)回調(diào)(對(duì)于Handler來說就是handlerMessage(),對(duì)于線程池來說就是Runnable.run())中,反射執(zhí)行訂閱者事件函數(shù),從而達(dá)成線程切換這一功能。

用一張圖來說明下post過程:


EventBus::post

register,post都講了,最后,再說一下unregister就圓滿了。

(3). 解除訂閱

unregister過程:

/** Unregisters the given subscriber from all event classes. */
    public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType);
            }
            typesBySubscriber.remove(subscriber);
        } else {
            Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

 /** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
    private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }

源碼中可以看出,unregister主要就是做了2件事:

  1. 由訂閱者subscriber從typesBySubscriber中取出當(dāng)前訂閱者訂閱的所有事件集合。
  2. 遍歷訂閱事件,依次取出一個(gè)事件,然后從subscriptionsByEventType中獲得所有訂閱該事件的訂閱者集合,從中找到該取消注冊(cè)的subscriber,將其active狀態(tài)設(shè)為false,并從該事件集合中刪除。


    EventBus::unregister

最后,文字總結(jié)下:
Register過程:
注冊(cè)過程主要執(zhí)行2個(gè)操作:

  1. 由當(dāng)前訂閱者subscriber,通過apt或者反射方式獲取當(dāng)前subscriber及其父類事件訂閱函數(shù)(@Subscribe).
  2. subscribe過程,subscribe就是訂閱者的注冊(cè)過程,該過程會(huì)將當(dāng)前訂閱者與其(及其父類)訂閱方法記錄起來,主要做以下4大操作:
    1). 依據(jù)上面獲取到的事件集合,對(duì)每一個(gè)事件進(jìn)行操作。依據(jù)事件類別,從subscriptionsByEventType取出所有訂閱該事件的訂閱者集合(沒有訂閱,則進(jìn)行記錄創(chuàng)建)。
    2). 依據(jù)當(dāng)前事件的priority插入到訂閱者集合中,從而實(shí)現(xiàn)優(yōu)先級(jí)事件。
    3). 將當(dāng)前事件存儲(chǔ)到當(dāng)前訂閱者subscriber的事件集合中,方便后續(xù)unregister(依據(jù)subscriber獲取得到subscriber的事件集合,再遍歷事件集合,從各個(gè)事件注冊(cè)集合中,刪除該subscriber)。
    4). 如果當(dāng)前事件是sticky事件,那么就從stickyEvents集合中(postSticky()會(huì)記錄事件類型和事件實(shí)例)獲取得到事件實(shí)例,并讓當(dāng)前訂閱者subscriber進(jìn)行調(diào)用。

Post過程:
post過程就是一個(gè)事件分發(fā)過程,其最基礎(chǔ)的元素就是事件。
post一個(gè)事件的時(shí)候,主要經(jīng)歷以下4大歷程:

  1. 將當(dāng)前事件添加到當(dāng)前線程狀態(tài)事件列表中
  2. 循環(huán)取出當(dāng)前線程事件,依次進(jìn)行事件分發(fā):
  • 如果當(dāng)前事件總線支持事件繼承分發(fā),那么就獲取當(dāng)前事件及其所有父類/接口類別,存儲(chǔ)到集合中,然后遍歷集合,依次按照事件類別進(jìn)行分發(fā);
  • 如果當(dāng)前事件總線不支持事件繼承分發(fā),那么直接將當(dāng)前事件進(jìn)行分發(fā);
  • 如果未找到當(dāng)前事件訂閱者(沒有訂閱),則post一個(gè)NoSubscriberEvent事件
  1. 根據(jù)當(dāng)前事件類別,進(jìn)行事件分發(fā)。事件分發(fā)首先從注冊(cè)記錄中找到該事件類別的訂閱者集合,然后遍歷訂閱者集合,進(jìn)行單個(gè)訂閱者事件分發(fā)。
  2. 進(jìn)行單個(gè)訂閱者事件分發(fā)會(huì)依據(jù)訂閱者事件函數(shù)threadMode以及當(dāng)前線程狀態(tài)(是否是mainThread),自動(dòng)進(jìn)行線程切換。

Unregister過程:
unregister過程主要也是由2個(gè)操作:

  1. 由當(dāng)前要unregister的訂閱者subscriber,獲取得到其訂閱的所有事件集合。
  2. 遍歷上面得到的事件集合,依據(jù)事件從subscriptionsByEventType中取出所有訂閱了該事件的訂閱者集合,從中找到這個(gè)要unregister的訂閱者,將其active狀態(tài)設(shè)為false,并從事件集合中進(jìn)行刪除,這樣,就完成了unregister過程。

附錄

to be continue

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容