用途
- 實時獲取服務端的最新數(shù)據(jù)
- 查看調(diào)度任務的進度和執(zhí)行狀態(tài)
- 用戶感知:修改數(shù)據(jù)后,相關(guān)用戶收到信息
- 提升用戶體驗:耗時業(yè)務異步處理(Excel導入導出,復雜計算)
前端輪詢
這種方式實現(xiàn)簡單,前端通過setInterval定時去請求接口來獲取最新的數(shù)據(jù),當實時性要求不高,更新頻率低的情況下可以使用這種方式。但是當實時性很高的時候,我們的請求會很頻繁,服務器的消耗非常大,而且每次請求的時候服務端的數(shù)據(jù)可能還沒有改變,導致很多請求都是沒有意義的。
setInterval(function () {
// 請求接口操作
// 。。。
},
3000
);
webSocket
WebSocket是基于TCP協(xié)議的,它是全雙工通信的,服務端可以向客戶端發(fā)送信息,客戶端同樣可以向服務器發(fā)送指令,常用于聊天應用中。
pom.xml
SpringBoot提供了websocket的starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
config類
注入ServerEndpointExporter,這個bean會自動注冊使用了@ServerEndpoint注解聲明的Websocket endpoint
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
server類
創(chuàng)建一個服務類:
- 加上
@ServerEndpoint注解,設置WebSocket連接點的服務地址。 - 創(chuàng)建
AtomicInteger用于記錄連接數(shù) - 創(chuàng)建
ConcurrentHashMap用于存放連接信息 -
@OnOpen注解表明該方法在建立連接后調(diào)用 -
@OnClose注解表明該方法在斷開連接后調(diào)用 -
@OnError注解表明該方法在連接異常調(diào)用 -
@OnMessage注解表明該方法在收到客戶端消息后調(diào)用 - 創(chuàng)建推送信息的方法
- 創(chuàng)建移除連接的方法
@ServerEndpoint("/websocket/{userId}")
@Component
public class WebSocketServer {
private final static Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
/**
* 當前連接數(shù)
*/
private static AtomicInteger count = new AtomicInteger(0);
/**
* 使用map對象,便于根據(jù)userId來獲取對應的WebSocket,或者放redis里面
*/
private static Map<String, WebSocketServer> websocketMap = new ConcurrentHashMap<>();
/**
* 與某個客戶端的連接會話,需要通過它來給客戶端發(fā)送數(shù)據(jù)
*/
private Session session;
/**
* 對應的用戶ID
*/
private String userId = "";
/**
* 連接建立成功調(diào)用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
try {
this.session = session;
this.userId = userId;
websocketMap.put(userId, this);
// 數(shù)量+1
count.getAndIncrement();
logger.info("websocket 新連接:{}", userId);
} catch (Exception e) {
logger.error("websocket 新建連接 IO異常");
}
}
/**
* 連接關(guān)閉調(diào)用的方法
*/
@OnClose
public void onClose() {
// 刪除
websocketMap.remove(this.userId);
// 數(shù)量-1
count.getAndDecrement();
logger.info("close websocket : {}", this.userId);
}
/**
* 收到客戶端消息后調(diào)用的方法
*
* @param message 客戶端發(fā)送過來的消息
*/
@OnMessage
public void onMessage(String message) {
logger.info("來自客戶端{}的消息:{}", this.userId, message);
}
@OnError
public void onError(Throwable error) {
logger.info("websocket 發(fā)生錯誤,移除當前websocket:{},err:{}", this.userId, error.getMessage());
websocketMap.remove(this.userId);
// 數(shù)量-1
count.getAndDecrement();
}
/**
* 發(fā)送消息 (異步發(fā)送)
*
* @param message 消息主題
*/
private void sendMessage(String message) {
this.session.getAsyncRemote().sendText(message);
}
/**
* 向指定用戶發(fā)送信息
*
* @param userId 用戶id
* @param wsInfo 信息
*/
public static void sendInfo(String userId, String wsInfo) {
if (websocketMap.containsKey(userId)) {
websocketMap.get(userId).sendMessage(wsInfo);
}
}
/**
* 群發(fā)消息
*/
public static void batchSendInfo(String wsInfo, List<String> ids) {
ids.forEach(userId -> sendInfo(userId, wsInfo));
}
/**
* 群發(fā)所有人
*/
public static void batchSendInfo(String wsInfo) {
websocketMap.forEach((k, v) -> v.sendMessage(wsInfo));
}
/**
* 獲取當前連接信息
*/
public static List<String> getIds() {
return new ArrayList<>(websocketMap.keySet());
}
/**
* 獲取當前連接數(shù)量
*/
public static int getUserCount() {
return count.intValue();
}
}
測試接口
@RestController
@RequestMapping("/ws")
public class WebSocketController {
@GetMapping("/push/{message}")
public ResponseEntity<String> push(@PathVariable(name = "message") String message) {
WebSocketServer.batchSendInfo(message);
return ResponseEntity.ok("WebSocket 推送消息給所有人");
}
}
html
在resources/static下創(chuàng)建ws.html,將WebSocket的地址設為服務類中@ServerEndpoint注解所配置的地址
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket</title>
</head>
<body>
<div id="message"></div>
</body>
<script>
let websocket = null;
// 用時間戳模擬登錄用戶
const username = new Date().getTime();
// alert(username)
//判斷當前瀏覽器是否支持WebSocket
if ('WebSocket' in window) {
console.log("瀏覽器支持Websocket");
websocket = new WebSocket('ws://localhost:8080/websocket/' + username);
} else {
alert('當前瀏覽器 不支持 websocket');
}
//連接發(fā)生錯誤的回調(diào)方法
websocket.onerror = function () {
setMessageInnerHTML("WebSocket連接發(fā)生錯誤");
};
//連接成功建立的回調(diào)方法
websocket.onopen = function () {
setMessageInnerHTML("WebSocket連接成功");
};
//接收到消息的回調(diào)方法
websocket.onmessage = function (event) {
setMessageInnerHTML(event.data);
};
//連接關(guān)閉的回調(diào)方法
websocket.onclose = function () {
setMessageInnerHTML("WebSocket連接關(guān)閉");
};
//監(jiān)聽窗口關(guān)閉事件,當窗口關(guān)閉時,主動去關(guān)閉websocket連接,防止連接還沒斷開就關(guān)閉窗口,server端會拋異常。
window.onbeforeunload = function () {
closeWebSocket();
};
//關(guān)閉WebSocket連接
function closeWebSocket() {
websocket.close();
}
//將消息顯示在網(wǎng)頁上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
</script>
</html>
測試
啟動項目,訪問http://localhost:8080/ws.html,開啟連接。調(diào)用消息推送接口http://localhost:8080/ws/push/hello,查看網(wǎng)頁顯示信息。
SseEmitter
SseEmitter是SpringMVC(4.2+)提供的一種技術(shù),它是基于Http協(xié)議的,相比WebSocket,它更輕量,但是它只能從服務端向客戶端單向發(fā)送信息。在SpringBoot中我們無需引用其他jar就可以使用。
創(chuàng)建服務類
- 創(chuàng)建
AtomicInteger用于記錄連接數(shù) - 創(chuàng)建
ConcurrentHashMap用于存放連接信息 - 建立連接:創(chuàng)建并返回一個帶有超時時間的
SseEmitter給前端。超時間設為0表示永不過期 - 設置連接結(jié)束的回調(diào)方法
completionCallBack - 設置連接超時的回調(diào)方法
timeoutCallBack - 設置連接異常的回調(diào)方法
errorCallBack - 創(chuàng)建推送信息的方法
SseEmitter.send() - 創(chuàng)建移除連接的方法
public class SseEmitterServer {
private static final Logger logger = LoggerFactory.getLogger(SseEmitterServer.class);
/**
* 當前連接數(shù)
*/
private static AtomicInteger count = new AtomicInteger(0);
/**
* 使用map對象,便于根據(jù)userId來獲取對應的SseEmitter,或者放redis里面
*/
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 創(chuàng)建用戶連接并返回 SseEmitter
*
* @param userId 用戶ID
* @return SseEmitter
*/
public static SseEmitter connect(String userId) {
// 設置超時時間,0表示不過期。默認30秒,超過時間未完成會拋出異常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
// 注冊回調(diào)
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
// 數(shù)量+1
count.getAndIncrement();
logger.info("創(chuàng)建新的sse連接,當前用戶:{}", userId);
return sseEmitter;
}
/**
* 給指定用戶發(fā)送信息
*/
public static void sendMessage(String userId, String message) {
if (sseEmitterMap.containsKey(userId)) {
try {
// sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);
sseEmitterMap.get(userId).send(message);
} catch (IOException e) {
logger.error("用戶[{}]推送異常:{}", userId, e.getMessage());
removeUser(userId);
}
}
}
/**
* 群發(fā)消息
*/
public static void batchSendMessage(String wsInfo, List<String> ids) {
ids.forEach(userId -> sendMessage(wsInfo, userId));
}
/**
* 群發(fā)所有人
*/
public static void batchSendMessage(String wsInfo) {
sseEmitterMap.forEach((k, v) -> {
try {
v.send(wsInfo, MediaType.APPLICATION_JSON);
} catch (IOException e) {
logger.error("用戶[{}]推送異常:{}", k, e.getMessage());
removeUser(k);
}
});
}
/**
* 移除用戶連接
*/
public static void removeUser(String userId) {
sseEmitterMap.remove(userId);
// 數(shù)量-1
count.getAndDecrement();
logger.info("移除用戶:{}", userId);
}
/**
* 獲取當前連接信息
*/
public static List<String> getIds() {
return new ArrayList<>(sseEmitterMap.keySet());
}
/**
* 獲取當前連接數(shù)量
*/
public static int getUserCount() {
return count.intValue();
}
private static Runnable completionCallBack(String userId) {
return () -> {
logger.info("結(jié)束連接:{}", userId);
removeUser(userId);
};
}
private static Runnable timeoutCallBack(String userId) {
return () -> {
logger.info("連接超時:{}", userId);
removeUser(userId);
};
}
private static Consumer<Throwable> errorCallBack(String userId) {
return throwable -> {
logger.info("連接異常:{}", userId);
removeUser(userId);
};
}
}
測試接口
@RestController
@RequestMapping("/sse")
public class SseEmitterController {
/**
* 用于創(chuàng)建連接
*/
@GetMapping("/connect/{userId}")
public SseEmitter connect(@PathVariable String userId) {
return SseEmitterServer.connect(userId);
}
@GetMapping("/push/{message}")
public ResponseEntity<String> push(@PathVariable(name = "message") String message) {
SseEmitterServer.batchSendMessage(message);
return ResponseEntity.ok("WebSocket 推送消息給所有人");
}
}
html
在resources/static下創(chuàng)建ws.html,將EventSource的地址設為創(chuàng)建連接的地址
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SseEmitter</title>
</head>
<body>
<button onclick="closeSse()">關(guān)閉連接</button>
<div id="message"></div>
</body>
<script>
let source = null;
// 用時間戳模擬登錄用戶
const userId = new Date().getTime();
if (!!window.EventSource) {
// 建立連接
source = new EventSource('http://localhost:8080/sse/connect/' + userId);
/**
* 連接一旦建立,就會觸發(fā)open事件
* 另一種寫法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("建立連接。。。");
}, false);
/**
* 客戶端收到服務器發(fā)來的數(shù)據(jù)
* 另一種寫法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
/**
* 如果發(fā)生通信錯誤(比如連接中斷),就會觸發(fā)error事件
* 或者:
* 另一種寫法:source.onerror = function (event) {}
*/
source.addEventListener('error', function (e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML("連接關(guān)閉");
} else {
console.log(e);
}
}, false);
} else {
setMessageInnerHTML("你的瀏覽器不支持SSE");
}
// 監(jiān)聽窗口關(guān)閉事件,主動去關(guān)閉sse連接,如果服務端設置永不過期,瀏覽器關(guān)閉后手動清理服務端數(shù)據(jù)
window.onbeforeunload = function () {
closeSse();
};
// 關(guān)閉Sse連接
function closeSse() {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', 'http://localhost:8080/sse/close/' + userId, true);
httpRequest.send();
console.log("close");
}
// 將消息顯示在網(wǎng)頁上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
</script>
</html>
測試
啟動項目,訪問網(wǎng)頁http://localhost:8080/sse.html建立連接。調(diào)用發(fā)送信息接口http://localhost:8080/sse/push/hello,查看網(wǎng)頁顯示信息。
訪問源碼
所有代碼均上傳至Github上,方便大家訪問
>>>>>> 消息推送之 WebSocket 和 SseEmitter <<<<<<
日常求贊
創(chuàng)作不易,如果各位覺得有幫助,求點贊 支持
求關(guān)注
