前言
EventBus是一個(gè)優(yōu)秀的事件訂閱/發(fā)布框架,充分解耦了事件的發(fā)布者和訂閱者,簡(jiǎn)化了代碼。 記得剛來公司,老大讓我看的第一個(gè)框架就是EventBus, 用了這么久的EventBus,居然沒看過源碼,實(shí)在慚愧。。。雖然網(wǎng)上已經(jīng)有很多解析EventBus的文章了,還是決定自己寫一篇,這樣可以有更深層次、印象更深的理解。
EventBus的使用
這篇文章詳細(xì)講解了EventBus的三要素、四種線程模型,以及普通事件和粘性事件的訂閱和分發(fā),這里不再多說。
EventBus3.0源碼解析
通過EventBus的使用可以看到EventBus主要分為兩部分,事件的訂閱(注冊(cè))和分發(fā)。
看源碼之前先來熟悉幾個(gè)概念:
- 事件: 可以是任意類型的對(duì)象, 包括自定義的事件. 通過EventBus.getDefault().post(event)分發(fā)出去, 在訂閱方法的參數(shù)中接收事件.
- 訂閱者: EventBus.getDefault().register(MainActivity.this), MainActivity就是訂閱者.
- 訂閱方法: 被@Subscribe修飾的方法, SubscriberMethod描述了訂閱方法相關(guān)信息
final Method method;//訂閱方法
final ThreadMode threadMode;//線程模型
final Class<?> eventType;//傳遞進(jìn)來的事件的class對(duì)象
final int priority;//訂閱方法的優(yōu)先級(jí)
final boolean sticky;//是否是粘性事件
String methodString;//方法的名稱
- 訂閱關(guān)系Subscription:關(guān)聯(lián)訂閱者和訂閱方法
final Object subscriber;//訂閱者
final SubscriberMethod subscriberMethod;//訂閱方法
volatile boolean active;//是否有效, 初始化為true, unregister時(shí)置為false
- 注解Subscribe: 用來標(biāo)識(shí)訂閱者的訂閱方法
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
//線程模型
ThreadMode threadMode() default ThreadMode.POSTING;
//是否是粘性事件,默認(rèn)false
boolean sticky() default false;
//優(yōu)先級(jí), 影響同一個(gè)分發(fā)線程中事件分發(fā)的順序
int priority() default 0;
}
- 線程模型ThreadMode: 決定了事件將在哪一個(gè)線程中執(zhí)行,分以下四種:
POSTING(默認(rèn)): 在哪個(gè)線程分發(fā)事件,就在哪個(gè)線程處理事件,不需要切換線程.避免執(zhí)行耗時(shí)操作, 有可能會(huì)阻塞主線程.
?適用于那些不需要主線程就能快速完成的簡(jiǎn)單任務(wù)
MAIN: 在主線程中處理事件, 如果在主線程中分發(fā)事件,就直接處理事件, 如果不在主線程中分發(fā)事件,就通過Handler post到主線程進(jìn)行處理. 避免進(jìn)行耗時(shí)操作,阻塞主線程.
?適用于更新UI等必須在主線程處理的事件.
BACKGROUND:在后臺(tái)進(jìn)程中處理事件. 如果不在主線程中分發(fā)事件, 就在同一個(gè)線程中處理事件. 如果在主線程中分發(fā)事件, 就啟用唯一的后臺(tái)進(jìn)程, 依次執(zhí)行隊(duì)列中所有的事件. 避免執(zhí)行長(zhǎng)時(shí)間耗時(shí)操作, 影響隊(duì)列中其他事件的處理;
?適用于一般的耗時(shí)操作;
ASYNC: 單獨(dú)開一個(gè)線程處理事件, 獨(dú)立于主線程和分發(fā)事件的線程. 因?yàn)椴l(fā)線程數(shù)量有限, 應(yīng)避免同一時(shí)間觸發(fā)大量長(zhǎng)時(shí)間耗時(shí)操作的異步處理方法. EventBus會(huì)使用一個(gè)線程池來有效回收空閑的異步處理線程.
?適用于訪問網(wǎng)絡(luò)等長(zhǎng)時(shí)間耗時(shí)操作.
事件注冊(cè)流程
OK, 基本概念講清楚了, 下面來看下事件注冊(cè)的入口EventBus.getDefault().register(this). 一般通過EventBus.getDefault()獲取EventBus的實(shí)例, 來看下創(chuàng)建及初始化的過程.
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public static EventBus getDefault() {
//單例
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
public EventBus() {
this(DEFAULT_BUILDER);
}
//初始化了所有訂閱/發(fā)布過程中用到的參數(shù)
EventBus(EventBusBuilder builder) {
//鍵: 事件的class對(duì)象 值: 所有訂閱關(guān)系集合CopyOnWriteArrayList<Subscription>
subscriptionsByEventType = new HashMap<>();
//鍵: 訂閱者 值: 事件class對(duì)象的集合
typesBySubscriber = new HashMap<>();
//鍵:事件class對(duì)象 值: 粘性事件
stickyEvents = new ConcurrentHashMap<>();
//主線程處理器
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
//后臺(tái)線程處理器
backgroundPoster = new BackgroundPoster(this);
//異步線程處理器
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
//查找緩存訂閱方法的工具類
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
//是否繼承事件, true的話, 發(fā)送子事件時(shí), 也會(huì)發(fā)送父類事件.默認(rèn)為true
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
下面來看注冊(cè)的方法:
public void register(Object subscriber) {
//獲取到訂閱者對(duì)象
Class<?> subscriberClass = subscriber.getClass();
//查找訂閱者中所有的訂閱方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
//在同步代碼塊中對(duì)訂閱方法逐一進(jìn)行訂閱
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
SubscriberMethodFinder是查找和緩存訂閱方法的工具類, 看下findSubscriberMethods方法:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//METHOD_CACHE緩存訂閱者和訂閱者中所有訂閱方法的map
//鍵: 訂閱者對(duì)象, 值: List<SubscriberMethod>訂閱方法的集合
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
//直接使用緩存中的訂閱方法結(jié)合
if (subscriberMethods != null) {
return subscriberMethods;
}
//是否忽略注解器EventBusAnnotationProcessor生成的MyEventBusIndex
if (ignoreGeneratedIndex) {
//通過反射查找訂閱者中的訂閱方法集合
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//在MyEventBusIndex類中查找訂閱者的訂閱方法集合
subscriberMethods = findUsingInfo(subscriberClass);
}
//搜索完為空,拋異常, 不為空, 放入METHOD_CACHE緩存
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
注解器EventBusAnnotationProcessor是什么鬼, 還能自動(dòng)生成MyEventBusIndex來保存所有訂閱者與訂閱方法集合, 后面再詳細(xì)講解. 先看通過反射查找訂閱方法findUsingReflection.
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
//內(nèi)部類FindState, 用來保存和校驗(yàn)訂閱方法
FindState findState = prepareFindState();
//初始化FindState, 把訂閱者class對(duì)象傳進(jìn)去
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//查找當(dāng)前訂閱者對(duì)象中的訂閱方法
findUsingReflectionInSingleClass(findState);
//查找父類中的訂閱方法, clazz變?yōu)楦割悓?duì)象,以此循環(huán)查找父類中的訂閱方法
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
從FindState池中找到不為空的FindState,返回,達(dá)到復(fù)用的目的
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
//池中都為空, new一個(gè)FindState
return new FindState();
}
再來看如何根據(jù)FindState查找所有訂閱方法findUsingReflectionInSingleClass.
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 拿到該訂閱者的所有方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
//方法的修飾符必須是public,不是abstract,不是static
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
//方法必須只有一個(gè)參數(shù)
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
//方法被Subscribe注解
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
//檢查findstate是否添加了該方法,沒有返回true
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
//將訂閱方法的所有信息保存到findState.subscriberMethods
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
循環(huán)保存訂閱者及父類中所有訂閱方法之后, 返回getMethodsAndRelease(findState)
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
//獲取到之前保存的findState.subscriberMethods,最終返回
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
//清空findState的狀態(tài)
findState.recycle();
synchronized (FIND_STATE_POOL) {
//把findState放到池中空閑的位置, 以復(fù)用
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}
這是通過反射查找訂閱者中訂閱方法集合, 通過注解器EventBusAnnotationProcessor生成的MyEventBusIndex查找的方法findUsingInfo也放在后面介紹.
至此, 查找訂閱者及父類中所有訂閱方法結(jié)束了, 再看下逐一訂閱的方法subscribe.
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//獲取到事件
Class<?> eventType = subscriberMethod.eventType;
//創(chuàng)建這種訂閱關(guān)系
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//從緩存中根據(jù)事件class對(duì)象獲取訂閱關(guān)系集合
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
//緩存中沒有, new一個(gè)訂閱關(guān)系集合,保存
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
//緩存中訂閱關(guān)系集合,且已經(jīng)包含了這種訂閱關(guān)系,拋異常
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
//如果當(dāng)前訂閱關(guān)系優(yōu)先級(jí)大于集合中其他訂閱關(guān)系,插在前面
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//緩存中根據(jù)訂閱者查找事件對(duì)象的集合
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
//緩存沒有, new一個(gè)加入緩存
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//如果是粘性事件,直接分發(fā), 因此先分發(fā)再訂閱也能處理事件
if (subscriberMethod.sticky) {
//繼承事件的話, 從緩存中獲取所有鍵是該事件對(duì)象或其父類的事件, 然后分發(fā)
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
//獲取鍵: 事件對(duì)象
Class<?> candidateEventType = entry.getKey();
//鍵是當(dāng)前訂閱方法的事件對(duì)象或其父類
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
//分發(fā)粘性事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
//只從緩存中獲取該事件對(duì)象對(duì)應(yīng)的事件
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
盜個(gè)圖總結(jié)一下整個(gè)注冊(cè)流程: 圖來源

注冊(cè)流程看完了, 在看一下取消注冊(cè)的代碼:
public synchronized void unregister(Object subscriber) {
//根據(jù)訂閱者獲取所有事件對(duì)象
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
//刪除訂閱者中事件是eventType 的訂閱方法
unsubscribeByEventType(subscriber, eventType);
}
//typesBySubscriber中刪除訂閱者相關(guān)信息
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
//根據(jù)事件對(duì)象獲取所有該事件的訂閱方法
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
//遍歷所有訂閱方法
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
//是當(dāng)前要取消注冊(cè)的訂閱者, 就刪除該訂閱方法, 并把a(bǔ)ctive狀態(tài)置為false
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
事件分發(fā)流程
事件分普通事件和粘性事件, 分發(fā)也分兩種:
普通事件EventBus.getDefault().post(event);
粘性事件:EventBus.getDefault().postSticky(event);
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
post(event);
}
由此可以看出粘性事件就是在分發(fā)之前保存到stickyEvents中了, 其他與普通事件相同, 看普通事件如何分發(fā);
public void post(Object event) {
//當(dāng)前分發(fā)線程的狀態(tài)
PostingThreadState postingState = currentPostingThreadState.get();
//當(dāng)前分發(fā)線程的事件隊(duì)列
List<Object> eventQueue = postingState.eventQueue;
//把當(dāng)前事件加入隊(duì)列
eventQueue.add(event);
if (!postingState.isPosting) {
//是否在主線程分發(fā)
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//循環(huán)從隊(duì)列中移除一個(gè)事件并分發(fā)
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
currentPostingThreadState是一個(gè)包含PostingThreadState的ThreadLocal對(duì)象.
PostingThreadState記錄了當(dāng)前分發(fā)線程的具體信息,包括事件隊(duì)列,分發(fā)狀態(tài), 正在分發(fā)的事件及訂閱關(guān)系等.
ThreadLocal是一個(gè)線程內(nèi)部的數(shù)據(jù)存儲(chǔ)類, 通過它可以在不同的線程之中互不干擾地存儲(chǔ)并提供數(shù)據(jù), 具體介紹戳這里.
下面看如何分發(fā)隊(duì)列中的事件postSingleEvent
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
//繼承事件的話, 要分發(fā)當(dāng)前事件及其所有父類及接口
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//只要有一個(gè)分發(fā)成功就返回true
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//不繼承的話, 只分發(fā)當(dāng)前事件
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//如果沒有訂閱該事件
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
//發(fā)送一個(gè)NoSubscriberEvent
post(new NoSubscriberEvent(this, event));
}
}
}
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
//鍵: 事件class對(duì)象 值: 事件及其父類或父接口的class對(duì)象集合
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);//加入當(dāng)前事件class對(duì)象
//如果是接口, 加入所有父接口對(duì)象
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();//循環(huán)遍歷父類
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}
postSingleEvent判斷了一下是否要繼承事件, 繼承的話再對(duì)每一個(gè)事件(包括父類及父接口)分發(fā), 不繼承的話只分發(fā)當(dāng)前事件, 真正分發(fā)事件是在postSingleEventForEventType.
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//根據(jù)事件對(duì)象,獲取訂閱關(guān)系的集合
subscriptions = subscriptionsByEventType.get(eventClass);
}
//有訂閱者訂閱了該事件, 就一一處理, 最后返回true
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
//把事件及訂閱關(guān)系保存到分發(fā)線程中
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//處理事件
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
//分發(fā)完清空分發(fā)線程狀態(tài)
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
//沒有訂閱者訂閱該事件,返回false
return false;
}
private final HandlerPoster mainThreadPoster;
private final BackgroundPoster backgroundPoster;
private final AsyncPoster asyncPoster;
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
//不管當(dāng)前在哪個(gè)線程, 直接處理事件
invokeSubscriber(subscription, event);
break;
case MAIN:
//如果當(dāng)前在主線程, 則直接處理事件, 不在主線程則轉(zhuǎn)到mainThreadPoster中處理事件
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
//如果當(dāng)前在主線程, 則轉(zhuǎn)到backgroundPoster中處理, 如果不在主線程中,就直接處理
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
//直接轉(zhuǎn)到asyncPoster中處理
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
invokeSubscriber就是利用反射調(diào)用訂閱者的訂閱方法.
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
盜個(gè)圖來總結(jié)一下整個(gè)Post流程就是:

mainThreadPoster, backgroundPoster, asyncPoster是如何實(shí)現(xiàn)線程切換的呢?
線程切換
看三大Poster之前先看兩個(gè)類: PendingPost和PendingPostQueue.
PendingPost是一個(gè)待發(fā)送事件, 里面保存了將要發(fā)送的事件及訂閱方法, 并且維護(hù)了一個(gè)靜態(tài)待發(fā)送事件池,達(dá)到復(fù)用的目的.
final class PendingPost {
//靜態(tài)待發(fā)送事件池
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;//將要發(fā)送的事件
Subscription subscription;//將要發(fā)送的訂閱關(guān)系
PendingPost next;//下一個(gè)待發(fā)送事件
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
//從池中獲取一個(gè)待發(fā)送事件
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
//從末尾獲取待發(fā)送事件, 保存?zhèn)鬟f進(jìn)來的事件和訂閱關(guān)系,返回
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
//如果池中數(shù)量為空, new一個(gè)PendingPost返回
return new PendingPost(event, subscription);
}
//釋放當(dāng)前pendingPost
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
//先把pendingPost中數(shù)據(jù)置空, 在保存在池中, 以復(fù)用
synchronized (pendingPostPool) {
//保證池中數(shù)量不超過10000
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
PendingPostQueue相當(dāng)于一個(gè)待發(fā)送事件隊(duì)列, 維護(hù)了一個(gè)頭和一個(gè)尾.
final class PendingPostQueue {
private PendingPost head;
private PendingPost tail;
//把傳進(jìn)來的pendingPost放到隊(duì)列尾部
synchronized void enqueue(PendingPost pendingPost) {
if (pendingPost == null) {
throw new NullPointerException("null cannot be enqueued");
}
if (tail != null) {
tail.next = pendingPost;
tail = pendingPost;
} else if (head == null) {
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail");
}
notifyAll();
}
//取出隊(duì)列的頭部
synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {
tail = null;
}
}
return pendingPost;
}
//隊(duì)列為空的話,等maxMillisToWait毫秒再取出頭部
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
wait(maxMillisToWait);
}
return poll();
}
}
OK, 理解了這兩個(gè)類再來看三大Poster, 就好理解了.
- HandlerPoster主線程處理器
final class HandlerPoster extends Handler {
private final PendingPostQueue queue;//待發(fā)送事件隊(duì)列
private final int maxMillisInsideHandleMessage;//處理消息超時(shí)時(shí)間
private final EventBus eventBus;
private boolean handlerActive;//是否正在處理消息
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();//初始化隊(duì)列
}
void enqueue(Subscription subscription, Object event) {
//從靜態(tài)池中取出一個(gè)PendingPost, 保存當(dāng)前事件和訂閱關(guān)系
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//加入隊(duì)列
queue.enqueue(pendingPost);
//如果當(dāng)前沒有正在處理消息, 則發(fā)送一個(gè)空消息, 觸發(fā)handleMessage去處理消息
if (!handlerActive) {
handlerActive = true;
//如果發(fā)送失敗, 拋異常
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;//是否重新處理
try {
long started = SystemClock.uptimeMillis();
//循環(huán)從隊(duì)列中取出一個(gè)待發(fā)送事件處理
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
pendingPost = queue.poll();
//確保隊(duì)列中沒有待發(fā)送事件之后, handlerActive置為false, 退出循環(huán)
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//交給eventBus處理待發(fā)送的事件
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
//如果消息處理超時(shí), 發(fā)送一個(gè)空消息, 重新出發(fā)消息處理
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
可以看到EventBus處理事件的時(shí)候, 通過mainThreadPoster.enqueue(subscription, event)把事件加入了mainThreadPoster的隊(duì)列中, Handler的消息處理機(jī)制不斷循環(huán)從隊(duì)列中取出待發(fā)送事件, 通過eventBus.invokeSubscriber(pendingPost)又傳給EventBus處理, 這就實(shí)現(xiàn)了線程的切換, 因?yàn)镋ventBus初始化mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10)時(shí)指定了Looper.getMainLooper(), 也就指定了mainThreadPoster 是主線程中的Handler.
然后來看下EventBus的invokeSubscriber(pendingPost)
void invokeSubscriber(PendingPost pendingPost) {
//取出待發(fā)送事件中的事件及訂閱關(guān)系
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
//把pendingPost置空,并加入到靜態(tài)待發(fā)送事件池中
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
//通過反射調(diào)用訂閱者的訂閱方法
invokeSubscriber(subscription, event);
}
}
- BackgroundPoster后臺(tái)線程處理器
final class BackgroundPoster implements Runnable {
private final PendingPostQueue queue;//待發(fā)送事件隊(duì)列
private final EventBus eventBus;
private volatile boolean executorRunning;//是否正在處理
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();//初始化隊(duì)列
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
//當(dāng)前線程沒有運(yùn)行的話, 從線程池中new一個(gè)線程或取一個(gè)空閑線程來執(zhí)行run()方法
if (!executorRunning) {
executorRunning = true;
//eventBus.getExecutorService()默認(rèn)為Executors.newCachedThreadPool();
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
//循環(huán)從隊(duì)列中取出待發(fā)送事件,交給EventBus處理
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
//確保隊(duì)列中沒有事件, executorRunning置為false, 退出循環(huán)
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
//在當(dāng)前子線程中把待發(fā)送事件交給EventBus處理
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
BackgroundPoster 原理類似HandlerPoster, 把事件加入BackgroundPoster 的隊(duì)列中, 然后啟用了線程池中的線程來取出待發(fā)送事件, 并交給EventBus, 通過反射來處理事件.
- AsyncPoster 異步線程處理器
class AsyncPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
是不是跟BackgroundPoster 幾乎一樣, 不同點(diǎn)就在于每次enqueue加入隊(duì)列之后都調(diào)用了eventBus.getExecutorService()線程池中的一個(gè)新線程或空閑線程來執(zhí)行操作( 取出一個(gè)待發(fā)送事件, 交給EventBus處理), 及每個(gè)事件都是在獨(dú)立的線程中處理的. 而BackgroundPoster加入隊(duì)列之后, 只有在未運(yùn)行的時(shí)候才會(huì)從線程池中取一個(gè)新線程或空閑線程來執(zhí)行操作(循環(huán)從隊(duì)列中取出待發(fā)送事件并處理), 如果正在運(yùn)行, 就在當(dāng)前線程中執(zhí)行操作.
EventBusAnnotationProcessor
EventBus源碼中的EventBusAnnotationProcessor, 繼承了AbstractProcessor, 是的, 就是應(yīng)用APT技術(shù), 在編譯時(shí)掃描所有@Subscribe注解, 自動(dòng)生成MyEventBusIndex來保存所有訂閱者中的訂閱信息, 當(dāng)然MyEventBusIndex名稱可以自己配置, 具體配置方法可參考這篇文章.
首先看下生成的MyEventBusIndex的結(jié)構(gòu):
public class MyEventBusIndex implements SubscriberInfoIndex {
//鍵: 訂閱者對(duì)象 值: 訂閱者的訂閱信息
private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;
static {
SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();
//掃描所有訂閱者中被@Subscribe注解的所有方法, 以SimpleSubscriberInfo對(duì)象的形式保存在SUBSCRIBER_INDEX中
putIndex(new SimpleSubscriberInfo(TestRunnerActivity.class, true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onEventMainThread", TestFinishedEvent.class, ThreadMode.MAIN),
}));
putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscribeClassEventBusMain.class,
true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onEventMainThread", TestEvent.class, ThreadMode.MAIN),
}));
putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.SubscribeClassEventBusDefault.class,
true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onEvent", TestEvent.class),
}));
putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscriberClassEventBusAsync.class,
true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onEventAsync", TestEvent.class, ThreadMode.ASYNC),
}));
putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscribeClassEventBusBackground.class,
true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onEventBackgroundThread", TestEvent.class, ThreadMode.BACKGROUND),
}));
}
private static void putIndex(SubscriberInfo info) {
SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}
//通過訂閱者對(duì)象, 取出所有訂閱信息
@Override
public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
if (info != null) {
return info;
} else {
return null;
}
}
}
如何利用自動(dòng)生成的MyEventBusIndex呢?
EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();
public EventBusBuilder addIndex(SubscriberInfoIndex index) {
if(subscriberInfoIndexes == null) {
subscriberInfoIndexes = new ArrayList<>();
}
//把傳進(jìn)來的MyEventBusIndex保存到subscriberInfoIndexes中
subscriberInfoIndexes.add(index);
return this;
}
public EventBus installDefaultEventBus() {
synchronized (EventBus.class) {
if (EventBus.defaultInstance != null) {
throw new EventBusException("Default instance already exists." +
" It may be only set once before it's used the first time to ensure consistent behavior.");
}
EventBus.defaultInstance = build();
return EventBus.defaultInstance;
}
}
public EventBus build() {
return new EventBus(this);
}
MyEventBusIndex繼承了SubscriberInfoIndex, 把MyEventBusIndex加入到EventBusBuilder中, 再通過installDefaultEventBus生成EventBus實(shí)例, 傳到EventBus靜態(tài)實(shí)例defaultInstance中.到此就把MyEventBusIndex中的訂閱信息導(dǎo)入EventBus實(shí)例了.
還記得EventBus構(gòu)造方法中訂閱方法掃描器subscriberMethodFinder 的初始化嗎, 就是把EventBusBuilder的subscriberInfoIndexes信息傳給了掃描器.
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
然后來看通過MyEventBusIndex查找訂閱者的訂閱方法findUsingInfo.
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//查找訂閱信息
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
//獲取到該訂閱者的所有訂閱方法
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
//校驗(yàn)是否有重復(fù)的事件相同的訂閱方法, 沒有返回true
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
//沒有重復(fù), 把這個(gè)訂閱方法存到findState.subscriberMethods中
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//沒有找到在通過發(fā)射查找
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
private SubscriberInfo getSubscriberInfo(FindState findState) {
//如果已經(jīng)找到訂閱者相關(guān)信息
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}
//沒有訂閱者相關(guān)信息, 遍歷所有通過EventBusAnnotationProcessor生成的SubscriberInfoIndex 類, 查找對(duì)應(yīng)訂閱者的訂閱信息
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}
關(guān)于EventBusAnnotationProcessor是如何在編譯期間掃描注解生成MyEventBusIndex的, 我也沒看太懂(尷尬...), 感興趣的可以自己去看源碼.
最后感謝:
http://m.itdecent.cn/p/f057c460c77e
http://blog.csdn.net/lsyz0021/article/details/51985307