前言
前期加班加點(diǎn)趕項(xiàng)目, 趁著剛上線空兩天,趕緊看下EventBus做個(gè)"思維復(fù)健"
使用
EventBus的使用非常簡(jiǎn)單, 如果使用默認(rèn)的EventBus, 我們一般只會(huì)使用到以下三個(gè)API
- 綁定
EventBus.getDefault().regisiter(this);
- 發(fā)送信息
EventBus.getDefault().post(new Event());
- 解綁
EventBus.getDefault().unregisiter(this);
EventBus.getDefault()
EventBus內(nèi)部維護(hù)了一個(gè)單例, 通過getDefault我們可以獲取默認(rèn)單例來進(jìn)行綁定和發(fā)送動(dòng)作, 但是當(dāng)我們需要進(jìn)行一些關(guān)于log, 是否未有訂閱者情況的響應(yīng)處理時(shí), 我們可以通過EventBusBuilder通過構(gòu)建者模式來進(jìn)行配置處理,本篇解析僅分析默認(rèn)情況下的流程代碼
綁定
老規(guī)矩, 我們先上代碼
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
// 獲取對(duì)應(yīng)subscriber類的訂閱方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
// 遍歷執(zhí)行訂閱
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
我們根據(jù)傳遞的訂閱者來獲取相關(guān)的訂閱方法, 然后遍歷執(zhí)行訂閱的動(dòng)作. 我們首先看下如果查找訂閱者的所有訂閱方法
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 從緩存中查找訂閱方法
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
// 緩存中有, 直接返回
if (subscriberMethods != null) {
return subscriberMethods;
}
// 查找注冊(cè)方法, 默認(rèn)false
if (ignoreGeneratedIndex) {
// 使用反射查找
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 使用注解器生成的類查找
subscriberMethods = findUsingInfo(subscriberClass);
}
// 如果沒有訂閱方法, 則拋出異常
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
// 否則加入緩存中
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
我們知道EventBus3.0版本后通過@Subscribe注解來標(biāo)注對(duì)應(yīng)的訂閱方法, 可以看到通過findUsingInfo方法查詢訂閱方法, 如果沒有訂閱方法, 會(huì)拋出異常, 而如果找到了, 則會(huì)加入緩存METHOD_CACHE進(jìn)行內(nèi)部維護(hù), 這個(gè)方法可以優(yōu)化部分性能, 減少反射帶來的性能問題.
我們?cè)谕?code>findUsingInfo里看, 會(huì)發(fā)現(xiàn)如果找不到相關(guān)訂閱者信息的時(shí)候, 仍會(huì)通過反射來尋找.
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
// 遍歷訂閱者方法
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 沒有訂閱信息, 從反射來找
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
我們回頭去看subscribe訂閱動(dòng)作的執(zhí)行代碼
/**
* 訂閱動(dòng)作
* @param subscriber 訂閱者(類似訂閱的Activity之類)
* @param subscriberMethod 訂閱事件方法, 比如加了@Subscribe注解的方法
*/
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 訂閱事件的類, 比如平常傳遞的自己寫的EventLogin等等..
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// 獲取與eventType有關(guān)的訂閱事件的隊(duì)列
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
// 如果為空
if (subscriptions == null) {
// 初始隊(duì)列
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
// 如果管理的訂閱隊(duì)列存在新的訂閱事件, 則拋出已注冊(cè)事件的異常
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
// 遍歷訂閱的事件
for (int i = 0; i <= size; i++) {
// 根據(jù)優(yōu)先級(jí), 插入訂閱事件
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 以訂閱者為key, value為訂閱事件的類的隊(duì)列
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// 是否粘性事件
if (subscriberMethod.sticky) {
// 是否分發(fā)訂閱了響應(yīng)事件類父類事件的方法, 默認(rèn)為true
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
// stickyEvents 粘性事件集合, key為eventType的類, value是eventType對(duì)象
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
// 獲取候選eventType
Class<?> candidateEventType = entry.getKey();
// native方法, 應(yīng)該是判斷當(dāng)前注冊(cè)eventType與候選緩存的eventType是否匹配
if (eventType.isAssignableFrom(candidateEventType)) {
// 如果匹配, 校驗(yàn)并發(fā)送訂閱
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
相關(guān)注釋都在代碼里, 這塊的流程我們可以梳理成以下步驟:
- 獲取我們訂閱時(shí)間傳輸?shù)念?code>EventType, 初始化內(nèi)部維護(hù)的兩個(gè)集合, 分別是
subscriptionsByEventType和typesBySubscriber, 根據(jù)命名我們也可以理解, 一個(gè)是根據(jù)eventType區(qū)分的訂閱者隊(duì)列, 一個(gè)是根據(jù)subscriber(訂閱者)區(qū)分的eventType隊(duì)列, 分別向?qū)?yīng)的集合內(nèi)添加對(duì)應(yīng)新的訂閱者和訂閱事件 - 根據(jù)是否粘性事件判斷是否需要調(diào)用
checkPostStickyEventToSubscription直接發(fā)送信息給訂閱者 -
checkPostStickyEventToSubscription內(nèi)部判斷事件是否被中斷來判斷是否會(huì)調(diào)用到postToSubscription, 就是發(fā)送信息給訂閱者
發(fā)送信息
/**
* 事件發(fā)送
*/
/** Posts the given event to the event bus. */
public void post(Object event) {
// currentPostingThreadState 為ThreadLocal對(duì)象
// 獲取當(dāng)前線程的發(fā)送狀態(tài)
PostingThreadState postingState = currentPostingThreadState.get();
// 獲取當(dāng)前線程的事件發(fā)送隊(duì)列
List<Object> eventQueue = postingState.eventQueue;
// 添加事件
eventQueue.add(event);
// 如果不在發(fā)送中
if (!postingState.isPosting) {
// 判斷是否在主線程
postingState.isMainThread = isMainThread();
// 修改發(fā)送中狀態(tài)
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 遍歷發(fā)送隊(duì)列事件
while (!eventQueue.isEmpty()) {
// 從隊(duì)頭開始發(fā)送, 同時(shí)移除隊(duì)列中的對(duì)應(yīng)事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
// 修改發(fā)送中狀態(tài), 修改主線程判斷
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
由于在業(yè)務(wù)場(chǎng)景中, 無法判斷發(fā)送信息在什么線程下執(zhí)行的, 所以內(nèi)部維護(hù)的currentPostingThreadState是一個(gè)ThreadLocal對(duì)象, 它可以保證當(dāng)前線程的數(shù)據(jù)不會(huì)被其他線程共享.在post中, 我們就能看到EventBus會(huì)根據(jù)當(dāng)前線程, 將事件發(fā)送給當(dāng)前線程的隊(duì)列中, 然后遍歷執(zhí)行postSingleEvent進(jìn)行單個(gè)事件的發(fā)送, 同時(shí)移除掉隊(duì)列中已發(fā)送的事件
/**
* 發(fā)送單個(gè)事件
* @param event
* @param postingState
* @throws Error
*/
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
// event 是對(duì)應(yīng)eventType的實(shí)例
Class<?> eventClass = event.getClass();
// 默認(rèn)沒有找到訂閱者
boolean subscriptionFound = false;
// 默認(rèn)true, 判斷是否觸發(fā)eventType的父類或接口的訂閱
if (eventInheritance) {
// 查找獲取所有eventType的父類和接口
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
// 循環(huán)發(fā)送
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
// 如果沒有找到訂閱者
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
// 如果我們的builder配置了sendNoSubscriberEvent(默認(rèn)為true)
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
// 會(huì)發(fā)送一個(gè)NoSubscriberEvent的事件, 如果需要判斷無訂閱者時(shí)候的觸發(fā)情況, 可以接收這個(gè)事件做處理
post(new NoSubscriberEvent(this, event));
}
}
}
這里的流程我們可以分成兩步:
- 通過
postSingleEventForEventType根據(jù)eventType查找對(duì)應(yīng)的訂閱者, 如果找到, 則發(fā)送事件 - 如果沒有找到訂閱者, 根據(jù)構(gòu)造器內(nèi)我們通過
sendNoSubscriberEvent的配置, 來判斷是否需要發(fā)送一個(gè)無訂閱者響應(yīng)事件
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 根據(jù)eventType獲取訂閱者
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 發(fā)送給訂閱者
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
postSingleEventForEventType方法就執(zhí)行了上面的第一步動(dòng)作, 如果找到了訂閱者, 就會(huì)返回true; 否則, 返回false.最終我們通過調(diào)用postToSubscription將事件發(fā)送給訂閱者.
咱們繼續(xù)往下走.
/**
* 訂閱發(fā)布
* @param subscription 新注冊(cè)的訂閱者
* @param event eventType
* @param isMainThread 是否主線程
*/
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 訂閱方法的指定線程
switch (subscription.subscriberMethod.threadMode) {
// 相同線程內(nèi)
case POSTING:
invokeSubscriber(subscription, event);
break;
// 主線程內(nèi), 不阻塞
case MAIN:
if (isMainThread) {
// 訂閱者的調(diào)用
invokeSubscriber(subscription, event);
} else {
// 通過handler處理
mainThreadPoster.enqueue(subscription, event);
}
break;
// 主線程, 阻塞
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
// 后臺(tái)線程,
case BACKGROUND:
if (isMainThread) {
// 實(shí)現(xiàn)了Runnable
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
// 異步線程
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
默認(rèn)的線程模式一般是POSTING會(huì)走發(fā)送信息時(shí)所在的線程, 這樣避免了線程切換所存在的可能開銷.我們首先看下invokeSubscriber方法, 它的作用就是做到了訂閱者的調(diào)用
void invokeSubscriber(Subscription subscription, Object event) {
try {
// 訂閱方法的調(diào)用
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
其實(shí)可以看出, 這里就是獲取訂閱方法, 通過反射將事件作為參數(shù)調(diào)用.
我們看下走UI線程的流程, 在判斷當(dāng)前線程非主線程的情況下, 我們會(huì)調(diào)用到mainThreadPoster.enqueue(subscription, event);,
首先, 我們回到EventBus的構(gòu)造函數(shù)中, 找到mainThreadPoster的相關(guān)申明
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
public interface MainThreadSupport {
boolean isMainThread();
Poster createPoster(EventBus eventBus);
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
private final Looper looper;
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10);
}
}
}
可以看到他是個(gè)HandlerPoster對(duì)象, 然后再回來看HandlerPoster.enqueue對(duì)應(yīng)的代碼
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
這里首先維護(hù)了內(nèi)部的PendingPost, 并且將對(duì)應(yīng)的pendingPost加入執(zhí)行隊(duì)列中.HandlerPoster繼承于Handler, 根據(jù)他前面?zhèn)魅氲腖ooper可以判定保證信息的執(zhí)行是在主線程中做處理的, 現(xiàn)在我們看下handleMessage的處理
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
// 調(diào)用訂閱者
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
這里做的處理, 主要就是調(diào)用了EventBus對(duì)象的invokeSubscriber方法, 最終走到了訂閱者的方法的執(zhí)行.
至于其他的幾個(gè)線程模式, 查看對(duì)應(yīng)的POST也可以大致知道他的原理, 這里就暫且不表了.
解綁
相對(duì)前面的, 其實(shí)解綁的邏輯就非常簡(jiǎn)單了, 我們先看代碼
/**
* 解綁
*/
/** Unregisters the given subscriber from all event classes. */
public synchronized void unregister(Object subscriber) {
// 根據(jù)訂閱者獲取對(duì)應(yīng)的eventType
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
// 如果不為空
if (subscribedTypes != null) {
// 遍歷解綁
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
// 移除相關(guān)的eventType
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
我們?cè)诮壎ㄏ嚓P(guān)的解析中, 已經(jīng)知道其實(shí)內(nèi)部管理訂閱事件和訂閱者是通過typesBySubscriber和subscriptionsByEventType來實(shí)現(xiàn)的, 而這里就是移除掉與對(duì)應(yīng)訂閱者相關(guān)的對(duì)象即可.