
前言
大部分框架都是事件訂閱功能,即觀(guān)察者模式,或者叫事件機(jī)制。通過(guò)訂閱某個(gè)事件,當(dāng)觸發(fā)事件時(shí),回調(diào)某個(gè)方法。該功能非常的好用,而 SOFA 內(nèi)部也設(shè)計(jì)了這個(gè)功能,并且內(nèi)部大量使用了該功能。來(lái)看看是如何設(shè)計(jì)的。
源碼分析
核心類(lèi)有 3 個(gè):
- EventBus 事件總線(xiàn)
- Event 事件,即被觀(guān)察者
- Subscriber 訂閱者,即觀(guān)察者
Subscriber 是個(gè)抽象類(lèi), 子類(lèi)需要自己實(shí)現(xiàn) onEvent 方法,即回調(diào)方法。還有一個(gè)是否同步執(zhí)行的參數(shù)。
EventBus 類(lèi)實(shí)現(xiàn)了注冊(cè)功能,反注冊(cè)功能(刪除)。事件發(fā)生時(shí)通知訂閱者功能。
內(nèi)部使用一個(gè)“大型數(shù)據(jù)結(jié)構(gòu)”保存事件和訂閱者的信息。
ConcurrentHashMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>> SUBSCRIBER_MAP
所有相關(guān)信息都保存在該數(shù)據(jù)結(jié)構(gòu)中。
看看注冊(cè)功能。
public static void register(Class<? extends Event> eventClass, Subscriber subscriber) {
CopyOnWriteArraySet<Subscriber> set = SUBSCRIBER_MAP.get(eventClass);
if (set == null) {
set = new CopyOnWriteArraySet<Subscriber>();
CopyOnWriteArraySet<Subscriber> old = SUBSCRIBER_MAP.putIfAbsent(eventClass, set);
if (old != null) {
set = old;
}
}
set.add(subscriber);
}
參數(shù)為 一個(gè)事件對(duì)象,一個(gè)訂閱對(duì)象。
首先從 Map 中根據(jù)事件的 Class 獲取對(duì)應(yīng)的訂閱者集合,注意,這里都是用的并發(fā)容器。
下面的判斷有點(diǎn)意思,考慮到并發(fā)的情況,如果第一次獲取 Set 是 null,則嘗試創(chuàng)建一個(gè)并放進(jìn) Map,這里使用的并不是 put 方法,而是 putIfAbsent 方法,該方法作用等同于:
if (!map.containsKey(key))
return map.put(key, value);
else
return map.get(key);
所以,這里再一次考慮并發(fā)問(wèn)題,如果這個(gè)間隙有其他線(xiàn)程 put 了,就可以獲取到那個(gè)線(xiàn)程 put 的 Set。很謹(jǐn)慎。而且性能相比較鎖要好很多。雖然這個(gè)方法并發(fā)量不會(huì)很高,但也是一種性能優(yōu)化。
如果發(fā)生了并發(fā),就使用已有的 Set,然后將 Set 放置到 Map 中,完成事件和訂閱者的映射。
再看看取消注冊(cè)方法。
public static void unRegister(Class<? extends Event> eventClass, Subscriber subscriber) {
CopyOnWriteArraySet<Subscriber> set = SUBSCRIBER_MAP.get(eventClass);
if (set != null) {
set.remove(subscriber);
}
}
很簡(jiǎn)單,就是直接刪除。
再看看通知功能:
public static void post(final Event event) {
if (!isEnable()) {
return;
}
CopyOnWriteArraySet<Subscriber> subscribers = SUBSCRIBER_MAP.get(event.getClass());
if (CommonUtils.isNotEmpty(subscribers)) {
for (final Subscriber subscriber : subscribers) {
if (subscriber.isSync()) {
handleEvent(subscriber, event);
} else { // 異步
AsyncRuntime.getAsyncThreadPool().execute(
new Runnable() {
@Override
public void run() {
handleEvent(subscriber, event);
}
});
}
}
}
}
首先看是否開(kāi)啟了總線(xiàn)功能,在性能測(cè)試的時(shí)候,可能是關(guān)閉的。
如果開(kāi)啟了,就根據(jù)給定的時(shí)間找到訂閱者,循環(huán)調(diào)用 handleEvent 方法(其實(shí)就是調(diào)用訂閱者的 onEvent 方法)。
這里有一個(gè)是否異步的判斷,如果異步的,則在異步線(xiàn)程池執(zhí)行。
這個(gè)異步線(xiàn)程池 AsyncRuntime 可以看一下:
public static ThreadPoolExecutor getAsyncThreadPool(boolean build) {
if (asyncThreadPool == null && build) {
synchronized (AsyncRuntime.class) {
if (asyncThreadPool == null && build) {
// 一些系統(tǒng)參數(shù),可以從配置或者注冊(cè)中心獲取。
int coresize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_CORE);
int maxsize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_MAX);
int queuesize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_QUEUE);
int keepAliveTime = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_TIME);
BlockingQueue<Runnable> queue = ThreadPoolUtils.buildQueue(queuesize);
NamedThreadFactory threadFactory = new NamedThreadFactory("SOFA-RPC-CB", true);
RejectedExecutionHandler handler = new RejectedExecutionHandler() {
private int i = 1;
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (i++ % 7 == 0) {
i = 1;
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Task:{} has been reject because of threadPool exhausted!" +
" pool:{}, active:{}, queue:{}, taskcnt: {}", r,
executor.getPoolSize(),
executor.getActiveCount(),
executor.getQueue().size(),
executor.getTaskCount());
}
}
throw new RejectedExecutionException("Callback handler thread pool has bean exhausted");
}
};
asyncThreadPool = ThreadPoolUtils.newCachedThreadPool(
coresize, maxsize, keepAliveTime, queue, threadFactory, handler);
}
}
}
return asyncThreadPool;
}
這里也做了雙重檢查鎖。
默認(rèn)核心線(xiàn)程大小 10,最大 200, 隊(duì)列大小 256, 回收時(shí)間 60 秒。
因此,獲取的隊(duì)列就是 LinkedBlockingQueue。
這里的拒絕策略很有意思,每失敗 6 次,打印詳細(xì)信息,當(dāng)前線(xiàn)程數(shù),活動(dòng)線(xiàn)程數(shù)量,隊(duì)列 size, 任務(wù)總數(shù),不知道為什么這么設(shè)計(jì)(6次??)。
目前框架中 Event 的實(shí)現(xiàn)很多,我們?cè)谥暗脑创a分析中也看到很多了。而訂閱者目前只有一個(gè) FaultToleranceSubscriber。用于容錯(cuò)處理。是 FaultToleranceModule 模塊的功能。該功能也是個(gè)擴(kuò)展點(diǎn),當(dāng)系統(tǒng)初始化的時(shí)候,會(huì)注冊(cè) ClientSyncReceiveEvent 事件和 ClientAsyncReceiveEvent。
總結(jié)
這個(gè)事件總線(xiàn)功能真是觀(guān)察者模式的最佳實(shí)踐,通過(guò)系統(tǒng)中發(fā)生的事件,能夠讓外部模塊感知到并進(jìn)行處理,比如上面介紹的容錯(cuò)模塊。當(dāng)發(fā)生訂閱的事件后,外部模塊能夠響應(yīng),很完美。