SpringBoot2.x集成WebSocket實現(xiàn)廣播和點對點推送消息

?WebSocket是一種在單個TCP連接上進行全雙工通信的協(xié)議。WebSocket 協(xié)議在2008年誕生,2011年成為國際標準?,F(xiàn)在所有瀏覽器都已經(jīng)支持了。它的最大特點就是,服務(wù)器可以主動向客戶端推送信息,客戶端也可以主動向服務(wù)器發(fā)送信息,是真正的雙向平等對話,屬于服務(wù)器推送技術(shù)的一種。

?最近有需求使用WebSocket作為中間件去轉(zhuǎn)發(fā)數(shù)據(jù),于是就使用SpringBoot2.x與其進行整合,在此期間遇到了一些問題,總算是都解決了。話不多說,直接上代碼。

1、添加jar支持,build.gradle如下:

plugins {
    id 'org.springframework.boot' version '2.1.3.RELEASE'
    id 'java'
    id 'idea'
   // id 'war'
}

apply plugin: 'io.spring.dependency-management'

group = 'com.tiantian'
version = '1.0.0'
sourceCompatibility = '1.8'
targetCompatibility = '1.8'

repositories {
    mavenCentral()
}

dependencies {
    testImplementation (
            'org.springframework.boot:spring-boot-starter-test'
    )

    implementation (
            'org.springframework.boot:spring-boot-starter-web',
            'org.springframework.boot:spring-boot-starter-websocket',
            //'org.yeauty:netty-websocket-spring-boot-starter:0.7.4',
            'org.springframework.boot:spring-boot-starter-thymeleaf',
            'org.apache.commons:commons-lang3:3.8.1',
            'com.alibaba:fastjson:1.2.56'
    )

    // lombok支持,打包時處理
    compileOnly ('org.projectlombok:lombok:1.18.6')
    annotationProcessor 'org.projectlombok:lombok:1.18.6'
}

// 使用bootJar打jar包
jar {
    baseName = 'webserver'
    version = '1.0.0'
    manifest {
        attributes "Manifest-Version": 1.0,
                'Main-Class': 'com.tiantian.webserver.WebserverApplication'
    }
}

// 解決代碼中的中文編譯報錯
tasks.withType(JavaCompile) {
    options.encoding = "UTF-8"
}

說明:此項目是打成jar運行的,當然也可以打成war放在Tomcat上運行。

2、配置開啟WebSocket功能

/**
 * WebSocket配置
 *
 * @author yueli.liao
 * @date 2019-03-12 11:25
 */
@Configuration
public class WebSocketConfig {

    /**
     * 開啟WebSocket功能
     *
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
    
}

3、WebSocket核心服務(wù)

/**
 * WebSocket服務(wù)入口
 *
 * @author yueli.liao
 * @date 2019-03-12 15:16
 */
@Slf4j
@Component
@ServerEndpoint("/websocket/{id}")
public class WebSocketServer {

    // 客戶端ID
    private String id = "";

    // 與某個客戶端的連接會話,需要通過它來給客戶端發(fā)送數(shù)據(jù)
    private Session session;

    // 記錄當前在線連接數(shù)(為保證線程安全,須對使用此變量的方法加lock或synchronized)
    private static int onlineCount = 0;

    // 用來存儲當前在線的客戶端(此map線程安全)
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();

    /**
     * 連接建立成功后調(diào)用
     */
    @OnOpen
    public void onOpen(@PathParam(value = "id") String id, Session session) {
        this.session = session;
        this.id = id; // 接收到發(fā)送消息的客戶端編號
        webSocketMap.put(id, this); // 加入map中
        addOnlineCount();           // 在線數(shù)加1
        log.info("客戶端" + id + "加入,當前在線數(shù)為:" + getOnlineCount());
        try {
            sendMessage("WebSocket連接成功");
        } catch (IOException e) {
            log.error("WebSocket IO異常");
        }
    }

    /**
     * 連接關(guān)閉時調(diào)用
     */
    @OnClose
    public void onClose() {
        webSocketMap.remove(this); // 從map中刪除
        subOnlineCount();               // 在線數(shù)減1
        log.info("有一連接關(guān)閉,當前在線數(shù)為:" + getOnlineCount());
    }

    /**
     * 收到客戶端消息后調(diào)用
     *
     * @param message 客戶端發(fā)送過來的消息<br/>
     *                消息格式:內(nèi)容 - 表示群發(fā),內(nèi)容|X - 表示發(fā)給id為X的客戶端
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("來自客戶端的消息:" + message);
        String[] messages = message.split("[|]");
        try {
            if (messages.length > 1) {
                sendToUser(messages[0], messages[1]);
            } else {
                sendToAll(messages[0]);
            }
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }

    /**
     * 發(fā)生錯誤時回調(diào)
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("WebSocket發(fā)生錯誤");
        error.printStackTrace();
    }

    /**
     * 推送信息給指定ID客戶端,如客戶端不在線,則返回不在線信息給自己
     *
     * @param message 客戶端發(fā)來的消息
     * @param sendClientId 客戶端ID
     * @throws IOException
     */
    public void sendToUser(String message, String sendClientId) throws IOException {
        if (webSocketMap.get(sendClientId) != null) {
            if (!id.equals(sendClientId)) {
                webSocketMap.get(sendClientId).sendMessage("客戶端" + id + "發(fā)來消息:" + " <br/> " + message);
            } else {
                webSocketMap.get(sendClientId).sendMessage(message);
            }
        } else {
            // 如客戶端不在線,則返回不在線信息給自己
            sendToUser("當前客戶端不在線", id);
        }
    }

    /**
     * 推送發(fā)送信息給所有人
     *
     * @param message 要推送的消息
     * @throws IOException
     */
    public void sendToAll(String message) throws IOException {
        for (String key : webSocketMap.keySet()) {
            webSocketMap.get(key).sendMessage(message);
        }
    }

    /**
     * 推送消息
     *
     * @param message 要推送的消息
     * @throws IOException
     */
    private void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    private static synchronized int getOnlineCount() {
        return onlineCount;
    }

    private static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    private static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }

}

4、為解決大文本(超過WebSocket協(xié)議限制的長度)不能發(fā)送的問題,有兩種解決辦法,一種是在Tomcat \conf\web.xml文件添加如下代碼:

<!-- 注:單位為byte -->
<context-param>
    <param-name>org.apache.tomcat.websocket.textBufferSize</param-name>
    <param-value>1024000</param-value>
</context-param>
<context-param>
    <param-name>org.apache.tomcat.websocket.binaryBufferSize</param-name>
    <param-value>1024000</param-value>
</context-param>

另一種是在服務(wù)啟動時動態(tài)進行設(shè)置,在項目中添加如下代碼:

/**
 * 解決WebSocket傳輸內(nèi)容過長的問題
 * 注:若發(fā)送內(nèi)容過長時,服務(wù)端無內(nèi)容輸出,連接會自動關(guān)閉
 *
 * @author yueli.liao
 * @date 2019-03-13 13:35
 */
@Slf4j
@Configuration
@ComponentScan
@EnableAutoConfiguration
public class WebAppRootContext implements ServletContextInitializer {

    @Value("${websocket.bufferSize}")
    private String bufferSize;

    @Override
    public void onStartup(ServletContext servletContext) throws ServletException {
        log.info("WebSocket最大文本長度:" + bufferSize);
        servletContext.addListener(WebAppRootListener.class);
        servletContext.setInitParameter("org.apache.tomcat.websocket.textBufferSize", bufferSize);
    }

}

代碼中的websocket.bufferSize,在application.properties進行配置,如下:

#websocket textSize(單位為byte)
websocket.bufferSize=1024000

說明:推薦使用第二種方式,這樣可移植性強,對部署和開發(fā)都很方便。

5、測試,也有兩種方式,采用哪種或兩種都可以。

  • 一是在Google或Firefox瀏覽器中安裝WebSocket Client插件進行測試;
  • 二是在項目中寫相關(guān)的JS進行測試,代碼如下:
<!DOCTYPE HTML>
<html>
<head>
    <title>WebSocket測試</title>
</head>

<body>
Welcome<br/>
<input id="text" type="text" />
<button onclick="send()">發(fā)送消息</button>
<button onclick="closeWebSocket()">關(guān)閉連接</button>
<div id="message">
</div>
</body>

<script type="text/javascript">
    var websocket = null;

    // 判斷當前瀏覽器是否支持WebSocket
    if('WebSocket' in window){
        // 為了方便測試,故將鏈接寫死
        websocket = new WebSocket("ws://localhost:8088/websocket/1");
    } else{
        alert('Not support websocket')
    }

    // 連接發(fā)生錯誤的回調(diào)方法
    websocket.onerror = function(){
        setMessageInnerHTML("發(fā)生錯誤");
    };

    // 連接成功建立的回調(diào)方法
    websocket.onopen = function(event){
        setMessageInnerHTML("打開連接");
    }

    // 接收到消息的回調(diào)方法
    websocket.onmessage = function(event){
        setMessageInnerHTML(event.data);
    }

    // 連接關(guān)閉的回調(diào)方法
    websocket.onclose = function(){
        setMessageInnerHTML("關(guān)閉連接");
    }

    // 監(jiān)聽窗口關(guān)閉事件,當窗口關(guān)閉時,主動去關(guān)閉websocket連接,防止連接還沒斷開就關(guān)閉窗口,server端會拋異常
    window.onbeforeunload = function(){
        websocket.close();
    }

    // 將消息顯示在網(wǎng)頁上
    function setMessageInnerHTML(innerHTML){
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }

    // 關(guān)閉連接
    function closeWebSocket(){
        websocket.close();
    }

    // 發(fā)送消息
    function send(){
        var message = document.getElementById('text').value;
        websocket.send(message);
    }
</script>
</html>

說明:此webserver可以進行點對點推送消息,也可以進行廣播。源代碼已上傳到GitHub,可自行下載。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容