SpringBoot 消息推送之 WebSocket 和 SseEmitter

用途

  • 實時獲取服務端的最新數(shù)據(jù)
  • 查看調(diào)度任務的進度和執(zhí)行狀態(tài)
  • 用戶感知:修改數(shù)據(jù)后,相關(guān)用戶收到信息
  • 提升用戶體驗:耗時業(yè)務異步處理(Excel導入導出,復雜計算)

前端輪詢

這種方式實現(xiàn)簡單,前端通過setInterval定時去請求接口來獲取最新的數(shù)據(jù),當實時性要求不高,更新頻率低的情況下可以使用這種方式。但是當實時性很高的時候,我們的請求會很頻繁,服務器的消耗非常大,而且每次請求的時候服務端的數(shù)據(jù)可能還沒有改變,導致很多請求都是沒有意義的。

    setInterval(function () {
            // 請求接口操作
            // 。。。
        },
        3000
    );

webSocket

WebSocket是基于TCP協(xié)議的,它是全雙工通信的,服務端可以向客戶端發(fā)送信息,客戶端同樣可以向服務器發(fā)送指令,常用于聊天應用中。

pom.xml

SpringBoot提供了websocket的starter

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

config類

注入ServerEndpointExporter,這個bean會自動注冊使用了@ServerEndpoint注解聲明的Websocket endpoint

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

server類

創(chuàng)建一個服務類:

  • 加上@ServerEndpoint注解,設置WebSocket連接點的服務地址。
  • 創(chuàng)建AtomicInteger用于記錄連接數(shù)
  • 創(chuàng)建ConcurrentHashMap用于存放連接信息
  • @OnOpen注解表明該方法在建立連接后調(diào)用
  • @OnClose注解表明該方法在斷開連接后調(diào)用
  • @OnError注解表明該方法在連接異常調(diào)用
  • @OnMessage注解表明該方法在收到客戶端消息后調(diào)用
  • 創(chuàng)建推送信息的方法
  • 創(chuàng)建移除連接的方法
@ServerEndpoint("/websocket/{userId}")
@Component
public class WebSocketServer {

    private final static Logger logger = LoggerFactory.getLogger(WebSocketServer.class);

    /**
     * 當前連接數(shù)
     */
    private static AtomicInteger count = new AtomicInteger(0);

    /**
     * 使用map對象,便于根據(jù)userId來獲取對應的WebSocket,或者放redis里面
     */
    private static Map<String, WebSocketServer> websocketMap = new ConcurrentHashMap<>();

    /**
     * 與某個客戶端的連接會話,需要通過它來給客戶端發(fā)送數(shù)據(jù)
     */
    private Session session;

    /**
     * 對應的用戶ID
     */
    private String userId = "";

    /**
     * 連接建立成功調(diào)用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        try {
            this.session = session;
            this.userId = userId;
            websocketMap.put(userId, this);
            // 數(shù)量+1
            count.getAndIncrement();
            logger.info("websocket 新連接:{}", userId);
        } catch (Exception e) {
            logger.error("websocket 新建連接 IO異常");
        }
    }

    /**
     * 連接關(guān)閉調(diào)用的方法
     */
    @OnClose
    public void onClose() {
        // 刪除
        websocketMap.remove(this.userId);
        // 數(shù)量-1
        count.getAndDecrement();
        logger.info("close websocket : {}", this.userId);
    }

    /**
     * 收到客戶端消息后調(diào)用的方法
     *
     * @param message 客戶端發(fā)送過來的消息
     */
    @OnMessage
    public void onMessage(String message) {
        logger.info("來自客戶端{}的消息:{}", this.userId, message);
    }

    @OnError
    public void onError(Throwable error) {
        logger.info("websocket 發(fā)生錯誤,移除當前websocket:{},err:{}", this.userId, error.getMessage());
        websocketMap.remove(this.userId);
        // 數(shù)量-1
        count.getAndDecrement();
    }

    /**
     * 發(fā)送消息 (異步發(fā)送)
     *
     * @param message 消息主題
     */
    private void sendMessage(String message) {
        this.session.getAsyncRemote().sendText(message);
    }

    /**
     * 向指定用戶發(fā)送信息
     *
     * @param userId 用戶id
     * @param wsInfo 信息
     */
    public static void sendInfo(String userId, String wsInfo) {
        if (websocketMap.containsKey(userId)) {
            websocketMap.get(userId).sendMessage(wsInfo);
        }
    }

    /**
     * 群發(fā)消息
     */
    public static void batchSendInfo(String wsInfo, List<String> ids) {
        ids.forEach(userId -> sendInfo(userId, wsInfo));
    }

    /**
     * 群發(fā)所有人
     */
    public static void batchSendInfo(String wsInfo) {
        websocketMap.forEach((k, v) -> v.sendMessage(wsInfo));
    }

    /**
     * 獲取當前連接信息
     */
    public static List<String> getIds() {
        return new ArrayList<>(websocketMap.keySet());
    }

    /**
     * 獲取當前連接數(shù)量
     */
    public static int getUserCount() {
        return count.intValue();
    }
}

測試接口

@RestController
@RequestMapping("/ws")
public class WebSocketController {
    
    @GetMapping("/push/{message}")
    public ResponseEntity<String> push(@PathVariable(name = "message") String message) {
        WebSocketServer.batchSendInfo(message);
        return ResponseEntity.ok("WebSocket 推送消息給所有人");
    }

}

html

resources/static下創(chuàng)建ws.html,將WebSocket的地址設為服務類中@ServerEndpoint注解所配置的地址

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket</title>
</head>

<body>
<div id="message"></div>
</body>

<script>
    let websocket = null;

    // 用時間戳模擬登錄用戶
    const username = new Date().getTime();
    // alert(username)
    //判斷當前瀏覽器是否支持WebSocket
    if ('WebSocket' in window) {
        console.log("瀏覽器支持Websocket");
        websocket = new WebSocket('ws://localhost:8080/websocket/' + username);
    } else {
        alert('當前瀏覽器 不支持 websocket');
    }

    //連接發(fā)生錯誤的回調(diào)方法
    websocket.onerror = function () {
        setMessageInnerHTML("WebSocket連接發(fā)生錯誤");
    };

    //連接成功建立的回調(diào)方法
    websocket.onopen = function () {
        setMessageInnerHTML("WebSocket連接成功");
    };

    //接收到消息的回調(diào)方法
    websocket.onmessage = function (event) {
        setMessageInnerHTML(event.data);
    };

    //連接關(guān)閉的回調(diào)方法
    websocket.onclose = function () {
        setMessageInnerHTML("WebSocket連接關(guān)閉");
    };

    //監(jiān)聽窗口關(guān)閉事件,當窗口關(guān)閉時,主動去關(guān)閉websocket連接,防止連接還沒斷開就關(guān)閉窗口,server端會拋異常。
    window.onbeforeunload = function () {
        closeWebSocket();
    };

    //關(guān)閉WebSocket連接
    function closeWebSocket() {
        websocket.close();
    }

    //將消息顯示在網(wǎng)頁上
    function setMessageInnerHTML(innerHTML) {
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
</script>
</html>

測試

啟動項目,訪問http://localhost:8080/ws.html,開啟連接。調(diào)用消息推送接口http://localhost:8080/ws/push/hello,查看網(wǎng)頁顯示信息。

SseEmitter

SseEmitter是SpringMVC(4.2+)提供的一種技術(shù),它是基于Http協(xié)議的,相比WebSocket,它更輕量,但是它只能從服務端向客戶端單向發(fā)送信息。在SpringBoot中我們無需引用其他jar就可以使用。

創(chuàng)建服務類

  • 創(chuàng)建AtomicInteger用于記錄連接數(shù)
  • 創(chuàng)建ConcurrentHashMap用于存放連接信息
  • 建立連接:創(chuàng)建并返回一個帶有超時時間的SseEmitter給前端。超時間設為0表示永不過期
  • 設置連接結(jié)束的回調(diào)方法completionCallBack
  • 設置連接超時的回調(diào)方法timeoutCallBack
  • 設置連接異常的回調(diào)方法errorCallBack
  • 創(chuàng)建推送信息的方法SseEmitter.send()
  • 創(chuàng)建移除連接的方法
public class SseEmitterServer {

    private static final Logger logger = LoggerFactory.getLogger(SseEmitterServer.class);

    /**
     * 當前連接數(shù)
     */
    private static AtomicInteger count = new AtomicInteger(0);

    /**
     * 使用map對象,便于根據(jù)userId來獲取對應的SseEmitter,或者放redis里面
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * 創(chuàng)建用戶連接并返回 SseEmitter
     *
     * @param userId 用戶ID
     * @return SseEmitter
     */
    public static SseEmitter connect(String userId) {
        // 設置超時時間,0表示不過期。默認30秒,超過時間未完成會拋出異常:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L);
        // 注冊回調(diào)
        sseEmitter.onCompletion(completionCallBack(userId));
        sseEmitter.onError(errorCallBack(userId));
        sseEmitter.onTimeout(timeoutCallBack(userId));
        sseEmitterMap.put(userId, sseEmitter);
        // 數(shù)量+1
        count.getAndIncrement();
        logger.info("創(chuàng)建新的sse連接,當前用戶:{}", userId);
        return sseEmitter;
    }

    /**
     * 給指定用戶發(fā)送信息
     */
    public static void sendMessage(String userId, String message) {
        if (sseEmitterMap.containsKey(userId)) {
            try {
                // sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);
                sseEmitterMap.get(userId).send(message);
            } catch (IOException e) {
                logger.error("用戶[{}]推送異常:{}", userId, e.getMessage());
                removeUser(userId);
            }
        }
    }

    /**
     * 群發(fā)消息
     */
    public static void batchSendMessage(String wsInfo, List<String> ids) {
        ids.forEach(userId -> sendMessage(wsInfo, userId));
    }

    /**
     * 群發(fā)所有人
     */
    public static void batchSendMessage(String wsInfo) {
        sseEmitterMap.forEach((k, v) -> {
            try {
                v.send(wsInfo, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                logger.error("用戶[{}]推送異常:{}", k, e.getMessage());
                removeUser(k);
            }
        });
    }

    /**
     * 移除用戶連接
     */
    public static void removeUser(String userId) {
        sseEmitterMap.remove(userId);
        // 數(shù)量-1
        count.getAndDecrement();
        logger.info("移除用戶:{}", userId);
    }

    /**
     * 獲取當前連接信息
     */
    public static List<String> getIds() {
        return new ArrayList<>(sseEmitterMap.keySet());
    }

    /**
     * 獲取當前連接數(shù)量
     */
    public static int getUserCount() {
        return count.intValue();
    }

    private static Runnable completionCallBack(String userId) {
        return () -> {
            logger.info("結(jié)束連接:{}", userId);
            removeUser(userId);
        };
    }

    private static Runnable timeoutCallBack(String userId) {
        return () -> {
            logger.info("連接超時:{}", userId);
            removeUser(userId);
        };
    }

    private static Consumer<Throwable> errorCallBack(String userId) {
        return throwable -> {
            logger.info("連接異常:{}", userId);
            removeUser(userId);
        };
    }

}

測試接口

@RestController
@RequestMapping("/sse")
public class SseEmitterController {

    /**
     * 用于創(chuàng)建連接
     */
   @GetMapping("/connect/{userId}")
    public SseEmitter connect(@PathVariable String userId) {
        return SseEmitterServer.connect(userId);
    }
    
    @GetMapping("/push/{message}")
    public ResponseEntity<String> push(@PathVariable(name = "message") String message) {
        SseEmitterServer.batchSendMessage(message);
        return ResponseEntity.ok("WebSocket 推送消息給所有人");
    }
    
}

html

resources/static下創(chuàng)建ws.html,將EventSource的地址設為創(chuàng)建連接的地址

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SseEmitter</title>
</head>
<body>
<button onclick="closeSse()">關(guān)閉連接</button>
<div id="message"></div>
</body>
<script>
    let source = null;

    // 用時間戳模擬登錄用戶
    const userId = new Date().getTime();

    if (!!window.EventSource) {

        // 建立連接
        source = new EventSource('http://localhost:8080/sse/connect/' + userId);

        /**
         * 連接一旦建立,就會觸發(fā)open事件
         * 另一種寫法:source.onopen = function (event) {}
         */
        source.addEventListener('open', function (e) {
            setMessageInnerHTML("建立連接。。。");
        }, false);

        /**
         * 客戶端收到服務器發(fā)來的數(shù)據(jù)
         * 另一種寫法:source.onmessage = function (event) {}
         */
        source.addEventListener('message', function (e) {
            setMessageInnerHTML(e.data);
        });


        /**
         * 如果發(fā)生通信錯誤(比如連接中斷),就會觸發(fā)error事件
         * 或者:
         * 另一種寫法:source.onerror = function (event) {}
         */
        source.addEventListener('error', function (e) {
            if (e.readyState === EventSource.CLOSED) {
                setMessageInnerHTML("連接關(guān)閉");
            } else {
                console.log(e);
            }
        }, false);

    } else {
        setMessageInnerHTML("你的瀏覽器不支持SSE");
    }

    // 監(jiān)聽窗口關(guān)閉事件,主動去關(guān)閉sse連接,如果服務端設置永不過期,瀏覽器關(guān)閉后手動清理服務端數(shù)據(jù)
    window.onbeforeunload = function () {
        closeSse();
    };

    // 關(guān)閉Sse連接
    function closeSse() {
        source.close();
        const httpRequest = new XMLHttpRequest();
        httpRequest.open('GET', 'http://localhost:8080/sse/close/' + userId, true);
        httpRequest.send();
        console.log("close");
    }

    // 將消息顯示在網(wǎng)頁上
    function setMessageInnerHTML(innerHTML) {
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
</script>
</html>

測試

啟動項目,訪問網(wǎng)頁http://localhost:8080/sse.html建立連接。調(diào)用發(fā)送信息接口http://localhost:8080/sse/push/hello,查看網(wǎng)頁顯示信息。

訪問源碼

所有代碼均上傳至Github上,方便大家訪問

>>>>>> 消息推送之 WebSocket 和 SseEmitter <<<<<<

日常求贊

創(chuàng)作不易,如果各位覺得有幫助,求點贊 支持

求關(guān)注

俞大仙
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容