SOFA 源碼分析— 事件總線(xiàn)

前言

大部分框架都是事件訂閱功能,即觀(guān)察者模式,或者叫事件機(jī)制。通過(guò)訂閱某個(gè)事件,當(dāng)觸發(fā)事件時(shí),回調(diào)某個(gè)方法。該功能非常的好用,而 SOFA 內(nèi)部也設(shè)計(jì)了這個(gè)功能,并且內(nèi)部大量使用了該功能。來(lái)看看是如何設(shè)計(jì)的。

源碼分析

核心類(lèi)有 3 個(gè):

  • EventBus 事件總線(xiàn)
  • Event 事件,即被觀(guān)察者
  • Subscriber 訂閱者,即觀(guān)察者

Subscriber 是個(gè)抽象類(lèi), 子類(lèi)需要自己實(shí)現(xiàn) onEvent 方法,即回調(diào)方法。還有一個(gè)是否同步執(zhí)行的參數(shù)。

EventBus 類(lèi)實(shí)現(xiàn)了注冊(cè)功能,反注冊(cè)功能(刪除)。事件發(fā)生時(shí)通知訂閱者功能。

內(nèi)部使用一個(gè)“大型數(shù)據(jù)結(jié)構(gòu)”保存事件和訂閱者的信息。

ConcurrentHashMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>> SUBSCRIBER_MAP

所有相關(guān)信息都保存在該數(shù)據(jù)結(jié)構(gòu)中。

看看注冊(cè)功能。

public static void register(Class<? extends Event> eventClass, Subscriber subscriber) {
    CopyOnWriteArraySet<Subscriber> set = SUBSCRIBER_MAP.get(eventClass);
    if (set == null) {
        set = new CopyOnWriteArraySet<Subscriber>();
        CopyOnWriteArraySet<Subscriber> old = SUBSCRIBER_MAP.putIfAbsent(eventClass, set);
        if (old != null) {
            set = old;
        }
    }
    set.add(subscriber);
}

參數(shù)為 一個(gè)事件對(duì)象,一個(gè)訂閱對(duì)象。

首先從 Map 中根據(jù)事件的 Class 獲取對(duì)應(yīng)的訂閱者集合,注意,這里都是用的并發(fā)容器。

下面的判斷有點(diǎn)意思,考慮到并發(fā)的情況,如果第一次獲取 Set 是 null,則嘗試創(chuàng)建一個(gè)并放進(jìn) Map,這里使用的并不是 put 方法,而是 putIfAbsent 方法,該方法作用等同于:

  if (!map.containsKey(key)) 
      return map.put(key, value);
  else
       return map.get(key);

所以,這里再一次考慮并發(fā)問(wèn)題,如果這個(gè)間隙有其他線(xiàn)程 put 了,就可以獲取到那個(gè)線(xiàn)程 put 的 Set。很謹(jǐn)慎。而且性能相比較鎖要好很多。雖然這個(gè)方法并發(fā)量不會(huì)很高,但也是一種性能優(yōu)化。

如果發(fā)生了并發(fā),就使用已有的 Set,然后將 Set 放置到 Map 中,完成事件和訂閱者的映射。

再看看取消注冊(cè)方法。

public static void unRegister(Class<? extends Event> eventClass, Subscriber subscriber) {
    CopyOnWriteArraySet<Subscriber> set = SUBSCRIBER_MAP.get(eventClass);
    if (set != null) {
        set.remove(subscriber);
    }
}

很簡(jiǎn)單,就是直接刪除。

再看看通知功能:

    public static void post(final Event event) {
        if (!isEnable()) {
            return;
        }
        CopyOnWriteArraySet<Subscriber> subscribers = SUBSCRIBER_MAP.get(event.getClass());
        if (CommonUtils.isNotEmpty(subscribers)) {
            for (final Subscriber subscriber : subscribers) {
                if (subscriber.isSync()) {
                    handleEvent(subscriber, event);
                } else { // 異步
                    AsyncRuntime.getAsyncThreadPool().execute(
                        new Runnable() {
                            @Override
                            public void run() {
                                handleEvent(subscriber, event);
                            }
                        });
                }
            }
        }
    }

首先看是否開(kāi)啟了總線(xiàn)功能,在性能測(cè)試的時(shí)候,可能是關(guān)閉的。

如果開(kāi)啟了,就根據(jù)給定的時(shí)間找到訂閱者,循環(huán)調(diào)用 handleEvent 方法(其實(shí)就是調(diào)用訂閱者的 onEvent 方法)。

這里有一個(gè)是否異步的判斷,如果異步的,則在異步線(xiàn)程池執(zhí)行。

這個(gè)異步線(xiàn)程池 AsyncRuntime 可以看一下:

public static ThreadPoolExecutor getAsyncThreadPool(boolean build) {
    if (asyncThreadPool == null && build) {
        synchronized (AsyncRuntime.class) {
            if (asyncThreadPool == null && build) {
                // 一些系統(tǒng)參數(shù),可以從配置或者注冊(cè)中心獲取。
                int coresize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_CORE);
                int maxsize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_MAX);
                int queuesize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_QUEUE);
                int keepAliveTime = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_TIME);

                BlockingQueue<Runnable> queue = ThreadPoolUtils.buildQueue(queuesize);
                NamedThreadFactory threadFactory = new NamedThreadFactory("SOFA-RPC-CB", true);

                RejectedExecutionHandler handler = new RejectedExecutionHandler() {
                    private int i = 1;

                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        if (i++ % 7 == 0) {
                            i = 1;
                            if (LOGGER.isWarnEnabled()) {
                                LOGGER.warn("Task:{} has been reject because of threadPool exhausted!" +
                                    " pool:{}, active:{}, queue:{}, taskcnt: {}", r,
                                    executor.getPoolSize(),
                                    executor.getActiveCount(),
                                    executor.getQueue().size(),
                                    executor.getTaskCount());
                            }
                        }
                        throw new RejectedExecutionException("Callback handler thread pool has bean exhausted");
                    }
                };
                asyncThreadPool = ThreadPoolUtils.newCachedThreadPool(
                    coresize, maxsize, keepAliveTime, queue, threadFactory, handler);
            }
        }
    }
    return asyncThreadPool;
}

這里也做了雙重檢查鎖。

默認(rèn)核心線(xiàn)程大小 10,最大 200, 隊(duì)列大小 256, 回收時(shí)間 60 秒。

因此,獲取的隊(duì)列就是 LinkedBlockingQueue。

這里的拒絕策略很有意思,每失敗 6 次,打印詳細(xì)信息,當(dāng)前線(xiàn)程數(shù),活動(dòng)線(xiàn)程數(shù)量,隊(duì)列 size, 任務(wù)總數(shù),不知道為什么這么設(shè)計(jì)(6次??)。

目前框架中 Event 的實(shí)現(xiàn)很多,我們?cè)谥暗脑创a分析中也看到很多了。而訂閱者目前只有一個(gè) FaultToleranceSubscriber。用于容錯(cuò)處理。是 FaultToleranceModule 模塊的功能。該功能也是個(gè)擴(kuò)展點(diǎn),當(dāng)系統(tǒng)初始化的時(shí)候,會(huì)注冊(cè) ClientSyncReceiveEvent 事件和 ClientAsyncReceiveEvent。

總結(jié)

這個(gè)事件總線(xiàn)功能真是觀(guān)察者模式的最佳實(shí)踐,通過(guò)系統(tǒng)中發(fā)生的事件,能夠讓外部模塊感知到并進(jìn)行處理,比如上面介紹的容錯(cuò)模塊。當(dāng)發(fā)生訂閱的事件后,外部模塊能夠響應(yīng),很完美。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,711評(píng)論 19 139
  • 1.ios高性能編程 (1).內(nèi)層 最小的內(nèi)層平均值和峰值(2).耗電量 高效的算法和數(shù)據(jù)結(jié)構(gòu)(3).初始化時(shí)...
    歐辰_OSR閱讀 30,286評(píng)論 8 265
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,355評(píng)論 25 708
  • 當(dāng)append操作一個(gè)切片的時(shí)候,如果操作之后的切片沒(méi)有超過(guò)原始切片的容量(cap)值時(shí),新產(chǎn)生的切片與操作的切片...
    cheyongzi閱讀 2,381評(píng)論 2 0
  • ccc
    menspg閱讀 125評(píng)論 0 0

友情鏈接更多精彩內(nèi)容