上一篇關(guān)于訂閱和取消訂閱的分析:http://m.itdecent.cn/p/3f08a23c4544
上一篇對訂閱和取消訂閱進行了一個源碼分析,簡單來講就是我們在類中調(diào)用@Subscribe所訂閱事件的方法在訂閱過程被封裝成了subscriberMethod對象并被逐一添加到subscriptionsByEventType和typesBySubscriber這兩個map中去,取消訂閱則是分別從這兩個map中移除相關(guān)的映射關(guān)系。
注冊訂閱事件后,接下來看一下是如何發(fā)送訂閱事件的,發(fā)送訂閱事件使用的是:
EventBus.getDefault().post(TestEvent())
點擊進去看一下post方法:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
/** Posts the given event to the event bus. */
public void post(Object event) {
//分裝成PostingThreadState 對象
PostingThreadState postingState = currentPostingThreadState.get();
//從postingState獲取事件隊列
List<Object> eventQueue = postingState.eventQueue;
// 將當(dāng)前要發(fā)送的事件加入到隊列中
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//while 不斷輪詢發(fā)送事件
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
currentPostingThreadState是一個ThreadLocal類型的變量(ThreadLocal的作用:ThreadLocal是解決線程安全問題一個很好的思路,它通過為每個線程提供一個獨立的變量副本解決了變量并發(fā)訪問的沖突問題。在很多情況下,ThreadLocal比直接使用synchronized同步機制解決線程安全問題更簡單,更方便,且結(jié)果程序擁有更高的并發(fā)性。)currentPostingThreadState中存儲了當(dāng)前線程對應(yīng)的事件列表和線程的狀態(tài)信息等,上述主要調(diào)用了輪詢調(diào)用postSingleEvent方法,看一下postSingleEvent方法:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
//獲取該事件對象類型
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
//是否支持事件繼承,默認(rèn)為true,如果訂閱了父類型,當(dāng)發(fā)送子類型事件實也會調(diào)用其相關(guān)訂閱方法
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
// 找不到該事件的異常處理
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
上述eventInheritance默認(rèn)為true,表示如果訂閱了父類型,當(dāng)發(fā)送子類型事件實也會調(diào)用其相關(guān)訂閱方法,最終是調(diào)用postSingleEventForEventType進行分發(fā),看一下postSingleEventForEventType:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
//CopyOnWriteArrayList是一個線程安全的list,寫入時會復(fù)制一份數(shù)據(jù)出來,之后再賦值回去
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//從subscriptionsByEventType map中獲取該eventType下的所有訂閱對象,subscriptionsByEventType會不會有點熟悉??
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
// 遍歷該eventType對應(yīng)下的訂閱對象,并調(diào)用postToSubscription執(zhí)行分發(fā)操作
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
subscriptionsByEventType會不會覺得有點熟悉??這個便是我們上一篇訂閱分析所提到的,這個map key為某一事件類型,value為該事件類型下的所有訂閱,我們從subscriptionsByEventType中獲取該eventType下的所有訂閱后對其進行遍歷,并逐一調(diào)用postToSubscription()把事件分發(fā)到每一個訂閱對象中去,繼續(xù)看postToSubscription,這里我們會根據(jù)訂閱方法指定的threadMode信息來執(zhí)行不同的發(fā)布策略:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
threadMode總共有以下幾種類型
POSTING:執(zhí)行invokeSubscriber()方法,就是直接反射調(diào)用;
MAIN:首先去判斷當(dāng)前是否在UI線程,如果是的話則直接反射調(diào)用,否則調(diào)用mainThreadPoster#enqueue(),即把當(dāng)前的方法加入到隊列之中,然后通過handler去發(fā)送一個消息,在handler的handleMessage中去執(zhí)行方法。具體邏輯在HandlerPoster.java中;
MAIN_ORDERED:與上面邏輯類似,順序執(zhí)行我們的方法;
BACKGROUND:判斷當(dāng)前是否在UI線程,如果不是的話直接反射調(diào)用,是的話通過backgroundPoster.enqueue()將方法加入到后臺的一個隊列,最后通過線程池去執(zhí)行;
ASYNC:與BACKGROUND的邏輯類似,將任務(wù)加入到后臺的一個隊列,最終由Eventbus中的一個線程池去調(diào)用,這里的線程池與BACKGROUND邏輯中的線程池用的是同一個。
這里先取一個分支來看,假設(shè)我們指定事件監(jiān)聽最后是回到主線程,也即是平常常使用的 @Subscribe(threadMode = ThreadMode.MAIN),那么這里將會來到 case MAIN:分支,第一步先判斷發(fā)送事件的時候(即調(diào)用event post)是不是在主線程,是的話直接執(zhí)行invokeSubscriber()使用反射執(zhí)行方法
void invokeSubscriber(Subscription subscription, Object event) {
try {
//使用反射執(zhí)行
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
如果發(fā)送事件的時候(即調(diào)用event post)不是在主線程,則執(zhí)行mainThreadPoster.enqueue(subscription, event)方法,那這個mainThreadPoster是什么呢?
//簡化代碼
private final Poster mainThreadPoster;
....
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
....
MainThreadSupport getMainThreadSupport() {
if (mainThreadSupport != null) {
return mainThreadSupport;
} else if (AndroidLogger.isAndroidLogAvailable()) {
Object looperOrNull = getAndroidMainLooperOrNull();
return looperOrNull == null ? null :
new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
} else {
return null;
}
}
....
public interface MainThreadSupport {
boolean isMainThread();
Poster createPoster(EventBus eventBus);
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
private final Looper looper;
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10);
}
}
}
可以看到,mainThreadPoster內(nèi)部創(chuàng)建了一個主線程Looper,并最終new了一個HandlerPoster,HandlerPoster是mainThreadPoster的實現(xiàn)類,這里大概可以猜到mainThreadPoster其實就是主線程的handler,看一下HandlerPoster如何實現(xiàn),先看一下enqueue方法:
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
//構(gòu)建一個PendingPost對象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//添加進PendingPost隊列
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
//調(diào)用sendMessage發(fā)送消息,從而觸發(fā)handleMessage
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
......省略
PendingPostQueue是一個簡單實現(xiàn)的鏈表,內(nèi)部保存了兩個PendingPost對象 ,一頭一尾,尾部插入,頭部移除,調(diào)用enqueue()從鏈表尾部插入,調(diào)用poll()從鏈表頭部移除,每次插入后調(diào)用sendMessage從而回調(diào)到handleMessage,看一下handleMessage是如何從這個PendingPostQueue取出這個訂閱事件的:
......省略
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
//記錄開始時間
long started = SystemClock.uptimeMillis();
while (true) {
//while循環(huán)一直從PendingPostQueue取出PendingPost
PendingPost pendingPost = queue.poll();
//如果隊列為空則取消
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//這里同樣使用反射調(diào)用了方法
eventBus.invokeSubscriber(pendingPost);
//判斷執(zhí)行是否超過了指定時間,是的話重新調(diào)用sendMessage方法
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
可以看出,其實mainThreadPoster主要是幫我們回調(diào)到主線程,其內(nèi)部本質(zhì)上還是調(diào)用了反射去執(zhí)行方法,關(guān)于handleMessage,第一步是用while循環(huán)不斷輪詢?nèi)〕鲫犃兄械腜endingPost,當(dāng)隊列為空則停止輪詢,當(dāng)執(zhí)行處理方法的時長過長時則重新調(diào)用sendMessage,從而繼續(xù)回調(diào)到handleMessage,這是為了防止執(zhí)行時間過長,導(dǎo)致while循環(huán)阻塞主線程造成卡頓。
本篇博文到此結(jié)束,關(guān)于其他類型的threadMode執(zhí)行操作,可自行在研究,差不多。