一、觀察者模式
觀察者模式,Observer Pattern也叫作發(fā)布訂閱模式Publish/Subscribe。定義對(duì)象間一對(duì)多的依賴關(guān)系,使得每當(dāng)一個(gè)對(duì)象改變狀態(tài),則所有依賴與它的對(duì)象都會(huì)得到通知,并被自動(dòng)更新。
觀察者模式的幾角色名稱:
Subject被觀察者,定義被觀察者必須實(shí)現(xiàn)的職責(zé),它能動(dòng)態(tài)的增加取消觀察者,它一般是抽象類或者是實(shí)現(xiàn)類,僅僅完成作為被觀察者必須實(shí)現(xiàn)的職責(zé):管理觀察者并通知觀察者。
Observer觀察者,觀察者接受到消息后,即進(jìn)行更新操作,對(duì)接收到的信息進(jìn)行處理。
ConcreteSubject具體的被觀察者,定義被觀察者自己的業(yè)務(wù)邏輯,同時(shí)定義對(duì)哪些事件進(jìn)行通知。
ConcreteObserver具體的觀察者,每個(gè)觀察者接收到消息后的處理反應(yīng)是不同的,每個(gè)觀察者都有自己的處理邏輯。
觀察者模式的優(yōu)點(diǎn)
- 觀察者和被觀察者之間是抽象耦合,不管是增加觀察者還是被觀察者都非常容易擴(kuò)展。
- 建立一套觸發(fā)機(jī)制。
觀察者模式的缺點(diǎn)
- 觀察者模式需要考慮開(kāi)發(fā)效率和運(yùn)行效率問(wèn)題,一個(gè)被觀察者,多個(gè)觀察者,開(kāi)發(fā)和調(diào)試比較復(fù)雜,Java消息的通知默認(rèn)是順序執(zhí)行的,一個(gè)觀察者卡殼,會(huì)影響整體的執(zhí)行效率。這種情況一般考慮異步的方式。
使用場(chǎng)景
- 關(guān)聯(lián)行為場(chǎng)景,關(guān)聯(lián)是可拆分的。
- 事件多級(jí)觸發(fā)場(chǎng)景。
- 跨系統(tǒng)的消息交換場(chǎng)景,如消息隊(duì)列的處理機(jī)制。
二、Java中的觀察者模式
java.util.Observable類和java.util.Observer接口。
觀察者模式也叫作發(fā)布/訂閱模式。本文不作介紹
三、Spring中的觀察者模式
ApplicationContext中事件處理是由ApplicationEvent類和ApplicationListener接口來(lái)提供的。如果一個(gè)Bean實(shí)現(xiàn)了ApplicationListener接口,并且已經(jīng)發(fā)布到容器中去,每次ApplicationContext發(fā)布一個(gè)ApplicationEvent事件,這個(gè)Bean就會(huì)接到通知。Spring事件機(jī)制是觀察者模式的實(shí)現(xiàn)。
Spring中提供的標(biāo)準(zhǔn)事件:
ContextRefreshEvent,當(dāng)ApplicationContext容器初始化完成或者被刷新的時(shí)候,就會(huì)發(fā)布該事件。比如調(diào)用ConfigurableApplicationContext接口中的refresh()方法。此處的容器初始化指的是所有的Bean都被成功裝載,后處理(post-processor)Bean被檢測(cè)到并且激活,所有單例Bean都被預(yù)實(shí)例化,ApplicationContext容器已經(jīng)可以使用。只要上下文沒(méi)有被關(guān)閉,刷新可以被多次觸發(fā)。XMLWebApplicationContext支持熱刷新,GenericApplicationContext不支持熱刷新。
ContextStartedEvent,當(dāng)ApplicationContext啟動(dòng)的時(shí)候發(fā)布事件,即調(diào)用ConfigurableApplicationContext接口的start方法的時(shí)候。這里的啟動(dòng)是指,所有的被容器管理生命周期的Bean接受到一個(gè)明確的啟動(dòng)信號(hào)。在經(jīng)常需要停止后重新啟動(dòng)的場(chǎng)合比較適用。
ContextStoppedEvent,當(dāng)ApplicationContext容器停止的時(shí)候發(fā)布事件,即調(diào)用ConfigurableApplicationContext的close方法的時(shí)候。這里的停止是指,所有被容器管理生命周期的Bean接到一個(gè)明確的停止信號(hào)。
ContextClosedEvent,當(dāng)ApplicationContext關(guān)閉的時(shí)候發(fā)布事件,即調(diào)用ConfigurableApplicationContext的close方法的時(shí)候,關(guān)閉指的是所有的單例Bean都被銷毀。關(guān)閉上下文后,不能重新刷新或者重新啟動(dòng)。
RequestHandledEvent,只能用于DispatcherServlet的web應(yīng)用,Spring處理用戶請(qǐng)求結(jié)束后,系統(tǒng)會(huì)觸發(fā)該事件。
事件監(jiān)聽(tīng)機(jī)制的應(yīng)用
ApplicationEvent,容器事件,必須被ApplicationContext發(fā)布。
ApplicationListener,監(jiān)聽(tīng)器,可由容器中任何監(jiān)聽(tīng)器Bean擔(dān)任。
實(shí)現(xiàn)了ApplicationListener接口之后,需要實(shí)現(xiàn)方法onApplicationEvent(),在容器將所有的Bean都初始化完成之后,就會(huì)執(zhí)行該方法。
事件監(jiān)聽(tīng)機(jī)制原理
Spring在事件處理機(jī)制中是如何應(yīng)用觀察者模式:
- 事件,ApplicationEvent,該抽象類繼承了EventObject,EventObject是JDK中的類,并建議所有的事件都應(yīng)該繼承自EventObject。
- 事件監(jiān)聽(tīng)器,ApplicationListener,是一個(gè)接口,該接口繼承了EventListener接口。EventListener接口是JDK中的,建議所有的事件監(jiān)聽(tīng)器都應(yīng)該繼承EventListener。
- 事件發(fā)布,ApplicationEventPublisher,ApplicationContext繼承了該接口,在ApplicationContext的抽象實(shí)現(xiàn)類AbstractApplicationContext中做了實(shí)現(xiàn)
AbstractApplicationContext類中publishEvent方法實(shí)現(xiàn):
public void publishEvent(ApplicationEvent event) {
Assert.notNull(event, "Event must not be null");
if (logger.isTraceEnabled()) {
logger.trace("Publishing event in " + getDisplayName() + ": " + event);
}
//事件發(fā)布委托給ApplicationEventMulticaster來(lái)執(zhí)行
getApplicationEventMulticaster().multicastEvent(event);
if (this.parent != null) {
this.parent.publishEvent(event);
}
}
ApplicationEventMulticaster的multicastEvent方法的實(shí)現(xiàn)在SimpleApplicationEventMulticaster類中:
public void multicastEvent(final ApplicationEvent event) {
//獲得監(jiān)聽(tīng)器集合,遍歷監(jiān)聽(tīng)器,可支持同步和異步的廣播事件
for (final ApplicationListener listener : getApplicationListeners(event)) {
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(new Runnable() {
public void run() {
listener.onApplicationEvent(event);
}
});
}
else {
listener.onApplicationEvent(event);
}
}
}
這就執(zhí)行了了onApplicationEvent方法,這里是事件發(fā)生的地方。
Spring如何根據(jù)事件找到事件對(duì)應(yīng)的監(jiān)聽(tīng)器
在Spring容器初始化的時(shí)候,也就是在refresh方法中:
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
......
try {
......
// Initialize event multicaster for this context.
//初始化一個(gè)事件注冊(cè)表
initApplicationEventMulticaster();
......
// Check for listener beans and register them.
//注冊(cè)事件監(jiān)聽(tīng)器
registerListeners();
......
}
}
}
initApplicationEventMulticaster方法初始化事件注冊(cè)表:
protected void initApplicationEventMulticaster() {
//獲得beanFactory
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
//先查找BeanFactory中是否有ApplicationEventMulticaster
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
}
else {//如果BeanFactory中不存在,就創(chuàng)建一個(gè)SimpleApplicationEventMulticaster
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
}
}
在AbstractApplicationEventMulticaster類中有如下屬性:
//注冊(cè)表
private final ListenerRetriever defaultRetriever = new ListenerRetriever(false);
//注冊(cè)表的緩存
private final Map<ListenerCacheKey, ListenerRetriever> retrieverCache = new ConcurrentHashMap<ListenerCacheKey, ListenerRetriever>(64);
private BeanFactory beanFactory;
ListenerRetriever的結(jié)構(gòu)如下:
//用來(lái)存放監(jiān)聽(tīng)事件
public final Set<ApplicationListener> applicationListeners;
//存放監(jiān)聽(tīng)事件的類名稱
public final Set<String> applicationListenerBeans;
private final boolean preFiltered;
初始化注冊(cè)表之后,就會(huì)把事件注冊(cè)到注冊(cè)表中,registerListeners():
protected void registerListeners() {
//獲取所有的Listener,把事件的bean放到ApplicationEventMulticaster中
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let post-processors apply to them!
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
//把事件的名稱放到ApplicationListenerBean里去。
for (String lisName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(lisName);
}
}
Spring使用反射機(jī)制,通過(guò)方法getBeansOfType獲取所有繼承了ApplicationListener接口的監(jiān)聽(tīng)器,然后把監(jiān)聽(tīng)器放到注冊(cè)表中,所以我們可以在Spring配置文件中配置自定義監(jiān)聽(tīng)器,在Spring初始化的時(shí)候,會(huì)把監(jiān)聽(tīng)器自動(dòng)注冊(cè)到注冊(cè)表中去。
ApplicationContext發(fā)布事件可以參考上面的內(nèi)容。發(fā)布事件的時(shí)候的一個(gè)方法,getApplicationListeners:
protected Collection<ApplicationListener> getApplicationListeners(ApplicationEvent event) {
//獲取事件類型
Class<? extends ApplicationEvent> eventType = event.getClass();
//或去事件源類型
Class sourceType = event.getSource().getClass();
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
//從緩存中查找ListenerRetriever
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
//緩存中存在,直接返回對(duì)應(yīng)的Listener
if (retriever != null) {
return retriever.getApplicationListeners();
}
else {//緩存中不存在,就獲取相應(yīng)的Listener
retriever = new ListenerRetriever(true);
LinkedList<ApplicationListener> allListeners = new LinkedList<ApplicationListener>();
Set<ApplicationListener> listeners;
Set<String> listenerBeans;
synchronized (this.defaultRetriever) {
listeners = new LinkedHashSet<ApplicationListener>(this.defaultRetriever.applicationListeners);
listenerBeans = new LinkedHashSet<String>(this.defaultRetriever.applicationListenerBeans);
}
//根據(jù)事件類型,事件源類型,獲取所需要的監(jiān)聽(tīng)事件
for (ApplicationListener listener : listeners) {
if (supportsEvent(listener, eventType, sourceType)) {
retriever.applicationListeners.add(listener);
allListeners.add(listener);
}
}
if (!listenerBeans.isEmpty()) {
BeanFactory beanFactory = getBeanFactory();
for (String listenerBeanName : listenerBeans) {
ApplicationListener listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class);
if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) {
retriever.applicationListenerBeans.add(listenerBeanName);
allListeners.add(listener);
}
}
}
OrderComparator.sort(allListeners);
this.retrieverCache.put(cacheKey, retriever);
return allListeners;
}
}
根據(jù)事件類型,事件源類型獲取所需要的監(jiān)聽(tīng)器supportsEvent(listener, eventType, sourceType):
protected boolean supportsEvent(
ApplicationListener listener, Class<? extends ApplicationEvent> eventType, Class sourceType) {
SmartApplicationListener smartListener = (listener instanceof SmartApplicationListener ?
(SmartApplicationListener) listener : new GenericApplicationListenerAdapter(listener));
return (smartListener.supportsEventType(eventType) && smartListener.supportsSourceType(sourceType));
}
這里沒(méi)有進(jìn)行實(shí)際的處理,實(shí)際處理在smartListener.supportsEventType(eventType)和smartListener.supportsSourceType(sourceType)方法中。
smartListener.supportsEventType(eventType):
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
Class typeArg = GenericTypeResolver.resolveTypeArgument(this.delegate.getClass(), ApplicationListener.class);
if (typeArg == null || typeArg.equals(ApplicationEvent.class)) {
Class targetClass = AopUtils.getTargetClass(this.delegate);
if (targetClass != this.delegate.getClass()) {
typeArg = GenericTypeResolver.resolveTypeArgument(targetClass, ApplicationListener.class);
}
}
return (typeArg == null || typeArg.isAssignableFrom(eventType));
}
該方法主要的邏輯就是根據(jù)事件類型判斷是否和監(jiān)聽(tīng)器參數(shù)泛型的類型是否一致。
smartListener.supportsSourceType(sourceType)方法的實(shí)現(xiàn)為:
public boolean supportsSourceType(Class<?> sourceType) {
return true;
}
定義自己的監(jiān)聽(tīng)器要明確指定參數(shù)泛型,表明該監(jiān)聽(tīng)器支持的事件,如果不指明具體的泛型,則沒(méi)有監(jiān)聽(tīng)器監(jiān)聽(tīng)事件。
事件監(jiān)聽(tīng)機(jī)制案例分析
假設(shè)現(xiàn)在有這么一個(gè)業(yè)務(wù)場(chǎng)景:
用戶在商城下單成功后,平臺(tái)要發(fā)送短信通知用戶下單成功
我們最直觀的想法是直接在order()方法中添加發(fā)送短信的業(yè)務(wù)代碼:
public void order(){
// 下單成功
System.out.println("下單成功...");
// 發(fā)送短信
sendSms();
}
咋一看沒(méi)什么不妥,但是如果我們加上一根時(shí)間軸,那么代碼就有問(wèn)題了:
一個(gè)月后,商城搞了自建物流體系,用戶下單成功后,需要通知物流系統(tǒng)發(fā)貨。
于是你又要打開(kāi)OrderService修改order()方法:
public void order(){
// 下單成功
System.out.println("下單成功...");
// 發(fā)送短信
sendSms();
// 通知車隊(duì)發(fā)貨
notifyCar();
}
又過(guò)了一個(gè)月,決策層決定賣掉自己的物流體系,所以下單后就不用通知車隊(duì)了
重新修改OrderService:
public void order(){
// 下單成功
System.out.println("下單成功...");
// 發(fā)送短信
sendSms();
// 車隊(duì)沒(méi)了,注釋掉這行代碼
// notifyCar();
}
這個(gè)示例告訴我們“以增量的方式應(yīng)對(duì)變化的需求”的重要性,可以避免反復(fù)修改一個(gè)既有類的邏輯,不符合面向?qū)ο缶幊痰?em>開(kāi)閉原則。
利用Spring事件機(jī)制完成需求
環(huán)境準(zhǔn)備
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.bravo</groupId>
<artifactId>springboot-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
OrderService
/**
* 訂單服務(wù)
*/
@Service
public class OrderService {
@Autowired
private ApplicationContext applicationContext;
public void order() {
// 下單成功
System.out.println("下單成功...");
// 發(fā)布通知
applicationContext.publishEvent(new OrderSuccessEvent(this));
System.out.println("main線程結(jié)束...");
}
}
OrderSuccessEvent(繼承ApplicationEvent,自定義事件)
public class OrderSuccessEvent extends ApplicationEvent {
/**
* Create a new ApplicationEvent.
*
* @param source the object on which the event initially occurred (never {@code null})
*/
public OrderSuccessEvent(Object source) {
super(source);
}
}
SmsService(實(shí)現(xiàn)ApplicationListener,監(jiān)聽(tīng)OrderSuccessEvent)
/**
* 短信服務(wù),監(jiān)聽(tīng)OrderSuccessEvent
*/
@Service
public class SmsService implements ApplicationListener<OrderSuccessEvent> {
@Override
public void onApplicationEvent(OrderSuccessEvent event) {
this.sendSms();
}
/**
* 發(fā)送短信
*/
public void sendSms()
System.out.println("發(fā)送短信...");
}
}
Test
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
@Autowired
private OrderService orderService;
@Test
public void testSpringEvent() {
orderService.order();
}
}
輸出:
下單成功...
發(fā)送短信...
main線程結(jié)束...
如果后期針對(duì)下單成功有新的操作,可以新寫一個(gè)事件監(jiān)聽(tīng)類:
/**
* 物流服務(wù)
*/
@Service
public class CarService implements ApplicationListener<OrderSuccessEvent> {
@Override
public void onApplicationEvent(OrderSuccessEvent event) {
this.dispatch();
}
public void dispatch() {
System.out.println("發(fā)車咯...");
}
}
這就是“以增量的方式應(yīng)對(duì)變化的需求”,而不是去修改已有的代碼。假設(shè)有B接口調(diào)用了C接口,你修改了C接口,那么B接口可能業(yè)務(wù)結(jié)果就錯(cuò)了,此時(shí)調(diào)用B接口的A接口也可能受到影響,是連鎖反應(yīng)。所以,一般我們都提倡“對(duì)擴(kuò)展開(kāi)放,對(duì)修改關(guān)閉”的原則。
上面SmsService既是一個(gè)服務(wù),還是一個(gè)Listener,因?yàn)樗扔蠤Service又實(shí)現(xiàn)了ApplicationListener接口。
但是僅僅是為了一個(gè)監(jiān)聽(tīng)回調(diào)方法而實(shí)現(xiàn)一個(gè)接口,未免麻煩,所以Spring提供了注解的方式:
/**
* 短信服務(wù),監(jiān)聽(tīng)LendSuccessEvent,但不用實(shí)現(xiàn)ApplicationListener
*/
@Service
public class SmsService {
/**
* 發(fā)送短信 @EventListener指定監(jiān)聽(tīng)的事件
*/
@EventListener(LendSuccesssEvent.class)
public void sendSms() {
try {
Thread.sleep(1000L * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("發(fā)送短信...");
}
}
Spring發(fā)布異步事件
看似很完美了,但是你注意到Spring默認(rèn)的事件機(jī)制是同步的:

如果針對(duì)OrderService下單成功的操作越來(lái)越多,比如下單后需要完成的對(duì)應(yīng)操作有十幾個(gè),又該如何應(yīng)對(duì)呢?

所以,你必須想辦法把Spring的事件機(jī)制改成異步的,盡可能快地返回下單的結(jié)果本身,而不是等其他附屬服務(wù)全部完成(涉及到其他問(wèn)題暫時(shí)按下不表)。
要想把Spring事件機(jī)制改造成異步通知,最粗暴的方法是:
OrderService
/**
* 訂單服務(wù)
*/
@Service
public class OrderService {
@Autowired
private ApplicationContext applicationContext;
public void order() {
// 下單成功
System.out.println("下單成功...");
// 發(fā)布通知
new Thread(() ->{
applicationContext.publishEvent(new OrderSuccessEvent(this));
}).start();
System.out.println("main線程結(jié)束...");
// 等SmsService結(jié)束
try {
Thread.sleep(1000L * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
SmsService
/**
* 短信服務(wù),監(jiān)聽(tīng)OrderSuccessEvent
*/
@Service
public class SmsService implements ApplicationListener<OrderSuccessEvent> {
@Override
public void onApplicationEvent(OrderSuccessEvent event) {
this.sendSms();
}
/**
* 發(fā)送短信
*/
public void sendSms()
try {
Thread.sleep(1000L * 3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("發(fā)送短信...");
}
}
輸出:
下單成功...
main線程結(jié)束...
發(fā)送短信...
當(dāng)然,這種做法其實(shí)違背了Spring事件機(jī)制的設(shè)計(jì)初衷。人家會(huì)想不到你要搞異步通知?

當(dāng)SimpleApplicationEventMulticaster中的Executor不為null,就會(huì)執(zhí)行異步通知。
@Configuration
public class AsyncEventConfig {
@Bean(name = "applicationEventMulticaster")
public ApplicationEventMulticaster simpleApplicationEventMulticaster() {
SimpleApplicationEventMulticaster eventMulticaster
= new SimpleApplicationEventMulticaster();
eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
return eventMulticaster;
}
}
當(dāng)然了,最后還是要說(shuō)一句,項(xiàng)目并發(fā)高了以后,也不可能用Spring監(jiān)聽(tīng)機(jī)制的,MQ會(huì)更合適些。