Guava之EventBus原理

EventBus是Guava的事件處理機(jī)制,是設(shè)計(jì)模式中的觀察者模式(生產(chǎn)/消費(fèi)者編程模型)的優(yōu)雅實(shí)現(xiàn)。對(duì)于事件監(jiān)聽和發(fā)布訂閱模式,EventBus是一個(gè)非常優(yōu)雅和簡(jiǎn)單解決方案,我們不用創(chuàng)建復(fù)雜的類和接口層次結(jié)構(gòu)。

demo

public static void main(String...args) {
    // 定義一個(gè)EventBus對(duì)象,這里的Joker是該對(duì)象的id
    EventBus eventBus = new EventBus("Joker");
    // 向上述EventBus對(duì)象中注冊(cè)一個(gè)監(jiān)聽對(duì)象   
    eventBus.register(new EventListener());
    // 使用EventBus發(fā)布一個(gè)事件,該事件會(huì)給通知到所有注冊(cè)的監(jiān)聽者
    eventBus.post(new Event("Hello every listener, joke begins..."));
}

// 事件,監(jiān)聽者監(jiān)聽的事件的包裝對(duì)象
public static class Event {
    public String message;
    Event(String message) {
        this.message = message;
    }
}

// 監(jiān)聽者
public static class EventListener {
    // 監(jiān)聽的方法,必須使用注解聲明,且只能有一個(gè)參數(shù),實(shí)際觸發(fā)一個(gè)事件的時(shí)候會(huì)根據(jù)參數(shù)類型觸發(fā)方法
    @Subscribe
    public void listen(Event event) {
        System.out.println("Event listener 1 event.message = " + event.message);
    }
}

這里我們封裝了一個(gè)事件對(duì)象Event,一個(gè)監(jiān)聽者對(duì)象EventListener。然后,我們用EventBus的構(gòu)造方法創(chuàng)建了一個(gè)EventBus實(shí)例,并將上述監(jiān)聽者實(shí)例注冊(cè)進(jìn)去。然后,我們使用上述EventBus實(shí)例發(fā)布一個(gè)事件Event。然后,以上注冊(cè)的監(jiān)聽者中的使用@Subscribe注解聲明并且只有一個(gè)Event類型的參數(shù)的方法將會(huì)在觸發(fā)事件的時(shí)候被觸發(fā)。

從上面的使用中,我們可以看出,EventBus與觀察者模式不同的地方在于:當(dāng)注冊(cè)了一個(gè)監(jiān)聽者的時(shí)候,只有當(dāng)某個(gè)方法使用了@Subscribe注解聲明并且參數(shù)與發(fā)布的事件類型匹配,那么這個(gè)方法才會(huì)被觸發(fā)。這就是說,同一個(gè)監(jiān)聽者可以監(jiān)聽多種類型的事件,也可以在多次監(jiān)聽同一個(gè)事件。

源碼分析

下面我們來分析一下在Guava中是如何為我們實(shí)現(xiàn)這個(gè)API的。不過,首先,我們還是先試著考慮一下自己設(shè)計(jì)這個(gè)API的時(shí)候如何設(shè)計(jì),并且提出幾個(gè)問題,然后帶著問題到源碼中尋找答案。

假如要我們?nèi)ピO(shè)計(jì)這樣一個(gè)API,最簡(jiǎn)單的方式就是在觀察者模式上進(jìn)行拓展:每次調(diào)用EventBus.post()方法的時(shí)候,會(huì)對(duì)所有的觀察者對(duì)象進(jìn)行遍歷,然后獲取它們?nèi)康姆椒?,判斷該方法是否使用了@Subscribe并且方法的參數(shù)類型是否與post()方法發(fā)布的事件類型一致,如果一致的話,那么我們就使用反射來觸發(fā)這個(gè)方法。在觀察者模式中,每個(gè)觀察者都要實(shí)現(xiàn)一個(gè)接口,發(fā)布事件的時(shí)候,我們只要調(diào)用接口的方法就行,但是EventBus把這個(gè)限制設(shè)定得更加寬泛,也就是監(jiān)聽者無需實(shí)現(xiàn)任何接口,只要方法使用了注解并且參數(shù)匹配即可。

這里面不僅要對(duì)所有的監(jiān)聽者進(jìn)行遍歷,還要對(duì)它們的方法進(jìn)行遍歷,找到了匹配的方法之后又要使用反射來觸發(fā)這個(gè)方法。首先,當(dāng)注冊(cè)的監(jiān)聽者數(shù)量比較多的時(shí)候,鏈?zhǔn)秸{(diào)用的效率就不高;然后我們又要使用反射來觸發(fā)匹配的方法,這樣效率肯定又低了一些。那么在Guava的EventBus中是如何解決這兩個(gè)問題的?

EventBus事件總線

  • register:把監(jiān)聽器中申明的所有訂閱事件方法注冊(cè)到SubscriberRegistry(訂閱者注冊(cè)器)中。
  • post:發(fā)布事件給所有已注冊(cè)過的訂閱者,最終開啟線程完成訂閱方法。
@Beta
public class EventBus {
  private final String identifier;//事件總線標(biāo)識(shí):用于自定義標(biāo)識(shí)這個(gè)事件總線
  private final Executor executor;//默認(rèn)的線程執(zhí)行器,用于把事件轉(zhuǎn)發(fā)給訂閱者
  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);//訂閱注冊(cè)器
  private final Dispatcher dispatcher;//事件轉(zhuǎn)發(fā)器
 15   //構(gòu)造器:使用默認(rèn)字符串
  public EventBus() {
    this("default");
  }
  //構(gòu)造器:使用自定義字符串
  public EventBus(String identifier) {
    this(
        identifier,
        MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(),
        LoggingHandler.INSTANCE);
  } 58 
  //注冊(cè)監(jiān)聽者中申明的所有訂閱方法(@Subscribe標(biāo)記的),用以接收事件
  public void register(Object object) {
    subscribers.register(object);
  }
  // 解除訂閱
  public void unregister(Object object) {
    subscribers.unregister(object);
  }

  //發(fā)布事件給所有已注冊(cè)過的訂閱者
  public void post(Object event) {
        // 找到事件的所有訂閱者
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
          // 事件轉(zhuǎn)發(fā)器,把事件轉(zhuǎn)發(fā)給訂閱者
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // 如果該事件即沒有訂閱者,也沒事DeadEvent,那么封裝成DeadEvent并重新發(fā)布
      post(new DeadEvent(this, event));
    }
...省略非重要方法167 }
  • subscribers是SubscriberRegistry類型的,實(shí)際上EventBus在添加、移除和遍歷觀察者的時(shí)候都會(huì)使用該實(shí)例的方法,所有的觀察者信息也都維護(hù)在該實(shí)例中.
  • executor是事件分發(fā)過程中使用到的線程池,可以自己實(shí)現(xiàn); dispatcher是Dispatcher類型的子類,用來在發(fā)布事件的時(shí)候分發(fā)消息給監(jiān)聽者,它有幾個(gè)默認(rèn)的實(shí)現(xiàn),分別針對(duì)不同的分發(fā)方式;
  • exceptionHandler是SubscriberExceptionHandler類型的,它用來處理異常信息,在默認(rèn)的EventBus實(shí)現(xiàn)中,會(huì)在出現(xiàn)異常的時(shí)候打印出log,當(dāng)然我們也可以定義自己的異常處理策咯。

如果我們想要了解EventBus是如何注冊(cè)和取消注冊(cè)以及如何遍歷來觸發(fā)事件的,就應(yīng)該從SubscriberRegistry入手.

我們需要在EventBus中維護(hù)幾個(gè)映射,以便在發(fā)布事件的時(shí)候找到并通知所有的監(jiān)聽者,首先是事件類型->觀察者列表的映射。

EventBus中發(fā)布事件是針對(duì)各個(gè)方法的,我們將一個(gè)事件對(duì)應(yīng)的類型信息和方法信息等都維護(hù)在一個(gè)對(duì)象中,在EventBus中就是觀察者Subscriber. 然后,通過事件類型映射到觀察者列表,當(dāng)發(fā)布事件的時(shí)候,只要根據(jù)事件類型到列表中尋找所有的觀察者并觸發(fā)監(jiān)聽方法即可。 在SubscriberRegistry中通過如下數(shù)據(jù)結(jié)構(gòu)來完成這一映射:

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();

從上面的定義形式中我們可以看出,這里使用的是事件的Class類型映射到Subscriber列表的。這里的Subscriber列表使用的是Java中的CopyOnWriteArraySet集合,
它底層使用了CopyOnWriteArrayList,并對(duì)其進(jìn)行了封裝,也就是在基本的集合上面增加了去重的操作。這是一種適用于讀多寫少場(chǎng)景的集合,在讀取數(shù)據(jù)的時(shí)候不會(huì)加鎖,
寫入數(shù)據(jù)的時(shí)候進(jìn)行加鎖,并且會(huì)進(jìn)行一次數(shù)組拷貝。

既然,我們已經(jīng)知道了在SubscriberRegistry內(nèi)部會(huì)在注冊(cè)的時(shí)候向以上數(shù)據(jù)結(jié)構(gòu)中插入映射,那么我們可以具體看下它是如何完成這一操作的。

在分析register()方法之前,我們先看下SubscriberRegistry內(nèi)部經(jīng)常使用的幾個(gè)方法,它們的原理與我們上面提出的問題息息相關(guān)。
首先是findAllSubscribers()方法,它用來獲取指定監(jiān)聽者對(duì)應(yīng)的全部觀察者集合。下面是它的代碼:

private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    // 創(chuàng)建一個(gè)哈希表
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    // 獲取監(jiān)聽者的類型
    Class<?> clazz = listener.getClass();
    // 獲取上述監(jiān)聽者的全部監(jiān)聽方法
    UnmodifiableIterator var4 = getAnnotatedMethods(clazz).iterator(); // 1
    // 遍歷上述方法,并且根據(jù)方法和類型參數(shù)創(chuàng)建觀察者并將其插入到映射表中
    while(var4.hasNext()) {
        Method method = (Method)var4.next();
        Class<?>[] parameterTypes = method.getParameterTypes();
        // 事件類型
        Class<?> eventType = parameterTypes[0];
        methodsInListener.put(eventType, Subscriber.create(this.bus, listener, method));
    }
    return methodsInListener;
}

這里注意一下Multimap數(shù)據(jù)結(jié)構(gòu),它是Guava中提供的集合結(jié)構(gòu),與普通的哈希表不同的地方在于,它可以完成一對(duì)多操作。這里用來存儲(chǔ)事件類型到觀察者的一對(duì)多映射。

  1. 調(diào)用SubscriberRegistry的register(listener)來執(zhí)行注冊(cè)監(jiān)聽器。
  2. register步驟如下:
    EventBus-包含-》SubscriberRegistry-包含-》ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers 用以維護(hù)事件和訂閱者的映射

注意下1處的代碼,我們上面也提到過,當(dāng)新注冊(cè)監(jiān)聽者的時(shí)候,用反射獲取全部方法并進(jìn)行判斷的過程非常浪費(fèi)性能,而這里就是這個(gè)問題的答案:

這里getAnnotatedMethods()方法會(huì)嘗試從subscriberMethodsCache中獲取所有的注冊(cè)監(jiān)聽的方法(即使用了注解并且只有一個(gè)參數(shù)),下面是這個(gè)方法的定義:

private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
    return (ImmutableList)subscriberMethodsCache.getUnchecked(clazz);
}

這里的subscriberMethodsCache的定義是:

private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache = CacheBuilder.newBuilder().weakKeys().build(new CacheLoader<Class<?>, ImmutableList<Method>>() {
    public ImmutableList<Method> load(Class<?> concreteClass) throws Exception { // 2
        return SubscriberRegistry.getAnnotatedMethodsNotCached(concreteClass);
    }
});

這里的作用機(jī)制是:當(dāng)使用subscriberMethodsCache.getUnchecked(clazz)獲取指定監(jiān)聽者中的方法的時(shí)候會(huì)先嘗試從緩存中進(jìn)行獲取,如果緩存中不存在就會(huì)執(zhí)行2處的代碼,
調(diào)用SubscriberRegistry中的getAnnotatedMethodsNotCached()方法獲取這些監(jiān)聽方法。其實(shí)就是使用反射并完成一些校驗(yàn),并不復(fù)雜。

private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
       //獲取超類class集合
    Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
    Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
    //遍歷超類
    for (Class<?> supertype : supertypes) {
      //遍歷超類中的所有定義的方法  
         for (Method method : supertype.getDeclaredMethods()) {
           //如果方法上有@Subscribe注解
        if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
          // 方法的參數(shù)類型數(shù)組
          Class<?>[] parameterTypes = method.getParameterTypes();
          // 校驗(yàn):事件訂閱方法必須只能有一個(gè)參數(shù),即事件類
        checkArgument(
              parameterTypes.length == 1,
              "Method %s has @Subscribe annotation but has %s parameters."
                  + "Subscriber methods must have exactly 1 parameter.",
              method,
              parameterTypes.length);
      // 封裝方法定義對(duì)象
          MethodIdentifier ident = new MethodIdentifier(method);
        // 去重并添加進(jìn)map
          if (!identifiers.containsKey(ident)) {
            identifiers.put(ident, method);
          }
        }
      }
    }
    // map轉(zhuǎn)ImmutableList
    return ImmutableList.copyOf(identifiers.values());
  }

這樣,我們就分析完了findAllSubscribers()方法,整理一下:當(dāng)注冊(cè)監(jiān)聽者的時(shí)候,首先會(huì)拿到該監(jiān)聽者的類型,然后從緩存中嘗試獲取該監(jiān)聽者對(duì)應(yīng)的所有監(jiān)聽方法,如果沒有的話就遍歷該類的方法進(jìn)行獲取,并添加到緩存中;
然后,會(huì)遍歷上述拿到的方法集合,根據(jù)事件的類型(從方法參數(shù)得知)和監(jiān)聽者等信息創(chuàng)建一個(gè)觀察者,并將事件類型-觀察者鍵值對(duì)插入到一個(gè)一對(duì)多映射表中并返回。

下面,我們看下EventBus中的register()方法的代碼:

void register(Object listener) {
    // 獲取事件類型-觀察者映射表
    Multimap<Class<?>, Subscriber> listenerMethods = this.findAllSubscribers(listener);
    Collection eventMethodsInListener;
    CopyOnWriteArraySet eventSubscribers;
    // 遍歷上述映射表并將新注冊(cè)的觀察者映射表添加到全局的subscribers中
    for(Iterator var3 = listenerMethods.asMap().entrySet().iterator(); var3.hasNext(); eventSubscribers.addAll(eventMethodsInListener)) {
        Entry<Class<?>, Collection<Subscriber>> entry = (Entry)var3.next();
        Class<?> eventType = (Class)entry.getKey();
        eventMethodsInListener = (Collection)entry.getValue();
        eventSubscribers = (CopyOnWriteArraySet)this.subscribers.get(eventType);
        // 如果指定事件對(duì)應(yīng)的觀察者列表不存在就創(chuàng)建一個(gè)新的
        if (eventSubscribers == null) {
            CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet();
            eventSubscribers = (CopyOnWriteArraySet)MoreObjects.firstNonNull(this.subscribers.putIfAbsent(eventType, newSet), newSet);
        }
    }
}

SubscriberRegistry中的register()方法與unregister()方法類似,我們不進(jìn)行說明。下面看下當(dāng)調(diào)用EventBus.post()方法的時(shí)候的邏輯。下面是其代碼:

public void post(Object event) {
    // 調(diào)用SubscriberRegistry的getSubscribers方法獲取該事件對(duì)應(yīng)的全部觀察者
    Iterator<Subscriber> eventSubscribers = this.subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
        // 使用Dispatcher對(duì)事件進(jìn)行分發(fā)
        this.dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
        this.post(new DeadEvent(this, event));
    }
}

當(dāng)調(diào)用EventBus.post()方法的時(shí)候回先用SubscriberRegistry的getSubscribers方法獲取該事件對(duì)應(yīng)的全部觀察者

Iterator<Subscriber> getSubscribers(Object event) {
    // 獲取事件類型的所有父類型和自身構(gòu)成的集合
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass()); // 3
    List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size());
    UnmodifiableIterator var4 = eventTypes.iterator();
    // 遍歷上述事件類型,并從subscribers中獲取所有的觀察者列表
    while(var4.hasNext()) {
        Class<?> eventType = (Class)var4.next();
        CopyOnWriteArraySet<Subscriber> eventSubscribers = (CopyOnWriteArraySet)this.subscribers.get(eventType);
        if (eventSubscribers != null) {
            subscriberIterators.add(eventSubscribers.iterator());
        }
    }
    return Iterators.concat(subscriberIterators.iterator());
}

Dispatcher
從EventBus.post()方法可以看出,當(dāng)我們使用Dispatcher進(jìn)行事件分發(fā)的時(shí)候,需要將當(dāng)前的事件和所有的觀察者作為參數(shù)傳入到方法中。然后,在方法的內(nèi)部進(jìn)行分發(fā)操作。最終某個(gè)監(jiān)聽者的監(jiān)聽方法是使用反射進(jìn)行觸發(fā)的,這部分邏輯在Subscriber內(nèi)部,而Dispatcher是事件分發(fā)的方式的策略接口。EventBus中提供了3個(gè)默認(rèn)的Dispatcher實(shí)現(xiàn),分別用于不同場(chǎng)景的事件分發(fā):

  • ImmediateDispatcher:直接在當(dāng)前線程中遍歷所有的觀察者并進(jìn)行事件分發(fā);
  • LegacyAsyncDispatcher:異步方法,存在兩個(gè)循環(huán),一先一后,前者用于不斷往全局的隊(duì)列中塞入封裝的觀察者對(duì)象,后者用于不斷從隊(duì)列中取出觀察者對(duì)象進(jìn)行事件分發(fā);實(shí)際上,EventBus有個(gè)字類AsyncEventBus就是用該分發(fā)器進(jìn)行事件分發(fā)的。
  • PerThreadQueuedDispatcher:這種分發(fā)器使用了兩個(gè)線程局部變量進(jìn)行控制,當(dāng)dispatch()方法被調(diào)用的時(shí)候,會(huì)先獲取當(dāng)前線程的觀察者隊(duì)列,并將傳入的觀察者列表傳入到該隊(duì)列中;然后通過一個(gè)布爾類型的線程局部變量,判斷當(dāng)前線程是否正在進(jìn)行分發(fā)操作,如果沒有在進(jìn)行分發(fā)操作,就通過遍歷上述隊(duì)列進(jìn)行事件分發(fā)。

上述三個(gè)分發(fā)器內(nèi)部最終都會(huì)調(diào)用Subscriber的dispatchEvent()方法進(jìn)行事件分發(fā):

final void dispatchEvent(final Object event) {
    // 使用指定的執(zhí)行器執(zhí)行任務(wù)
    this.executor.execute(new Runnable() {
        public void run() {
            try {
                // 使用反射觸發(fā)監(jiān)聽方法
                Subscriber.this.invokeSubscriberMethod(event);
            } catch (InvocationTargetException var2) {
                // 使用EventBus內(nèi)部的SubscriberExceptionHandler處理異常
                Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event));
            }
        }
    });
}

上述方法中的executor是執(zhí)行器,它是通過EventBus獲取到的;處理異常的SubscriberExceptionHandler類型也是通過EventBus獲取到的。(原來EventBus中的構(gòu)造方法中的字段是在這里用到的?。?/p>

小結(jié)

EventBus中維護(hù)了三個(gè)緩存和四個(gè)映射:

  • 事件類型到觀察者列表的映射(緩存)
  • 事件類型到監(jiān)聽者方法列表的映射(緩存)
  • 事件類型到事件類型及其所有父類的類型的列表的映射(緩存)
  • 觀察者到監(jiān)聽者的映射,觀察者到監(jiān)聽方法的映射;

觀察者Subscriber內(nèi)部封裝了監(jiān)聽者和監(jiān)聽方法,可以直接反射觸發(fā)。而如果是映射到監(jiān)聽者的話,還要判斷監(jiān)聽者的方法的類型來進(jìn)行觸發(fā)

每次使用EventBus注冊(cè)和取消注冊(cè)監(jiān)聽者的時(shí)候,都會(huì)先從緩存中進(jìn)行獲取,不是每一次都會(huì)用到反射的,這可以提升獲取的效率,也解答了我們一開始提出的效率的問題。當(dāng)使用反射觸發(fā)方法的調(diào)用貌似是不可避免的了。

ventBus中使用了非常多的數(shù)據(jù)結(jié)構(gòu),比如MultiMap、CopyOnWriteArraySet等,還有一些緩存和映射的工具庫,這些大部分都來自于Guava。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • EventBus的設(shè)計(jì)理念是基于觀察者模式的,可以參考設(shè)計(jì)模式(1)—觀察者模式先來了解該設(shè)計(jì)模式。 1、程序示例...
    開發(fā)者如是說閱讀 957評(píng)論 0 5
  • EventBus源碼分析(一) EventBus官方介紹為一個(gè)為Android系統(tǒng)優(yōu)化的事件訂閱總線,它不僅可以很...
    蕉下孤客閱讀 4,107評(píng)論 4 42
  • 最近看Elastic-Job源碼,看到它里面實(shí)現(xiàn)的任務(wù)運(yùn)行軌跡的持久化,使用的是Guava的AsyncEventB...
    端木軒閱讀 2,139評(píng)論 2 6
  • 作者:Markus JungingerGithub:greenrobot/EventBus原文:老司機(jī)教你 “飆”...
    敲代碼的本愿閱讀 3,940評(píng)論 3 11
  • 背景 如果遇到問題請(qǐng)?jiān)冢篽ttp://m.itdecent.cn/p/301edd6a2e61討論Event...
    yjy239閱讀 996評(píng)論 0 1

友情鏈接更多精彩內(nèi)容