Springboot+WebSocket+Reids SUBSCRIBE

一、使用場景
前端實時展示Tail信息
二、WebSocket注解介紹
當使用 Spring WebSocket 來實現(xiàn) WebSocket 端點時,可以使用 @ServerEndpoint 注解來標記 WebSocket 端點類,并在該類中實現(xiàn)相應(yīng)的 WebSocket 處理方法。
1.@ServerEndpoint 注解:@ServerEndpoint 注解用于標記一個類作為 WebSocket 端點。它將指定一個 URI,客戶端可以使用該 URI 連接到 WebSocket 端點。示例代碼如下:

java
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint("/websocket")
public class MyWebSocketEndpoint {
    // ...
}

上述代碼中,@ServerEndpoint("/websocket") 表示將 MyWebSocketEndpoint 類標記為一個 WebSocket 端點,并使用 /websocket 作為客戶端連接的 URI。

2.@OnOpen 注解:@OnOpen 注解用于標記一個方法,在客戶端與 WebSocket 端點成功建立連接后執(zhí)行。該方法可以包含一個 Session 參數(shù),用于表示客戶端的 WebSocket 會話。示例代碼如下:

java
Copy code
@ServerEndpoint("/websocket")
public class MyWebSocketEndpoint {

    @OnOpen
    public void onOpen(Session session) {
        // 處理連接建立后的邏輯
    }

}

在上述示例中,當客戶端與 WebSocket 端點建立連接后,onOpen() 方法將被調(diào)用,可以在該方法中執(zhí)行連接建立后的邏輯處理。

3.@OnMessage 注解:@OnMessage 注解用于標記一個方法,在接收到客戶端發(fā)送的消息時執(zhí)行。該方法可以包含一個 Session 參數(shù)表示客戶端的 WebSocket 會話,以及一個 String 或 byte[] 參數(shù)表示接收到的消息內(nèi)容。示例代碼如下:

java
Copy code
@ServerEndpoint("/websocket")
public class MyWebSocketEndpoint {

    @OnMessage
    public void onMessage(Session session, String message) {
        // 處理接收到的消息
    }

}

在上述示例中,當客戶端發(fā)送消息到 WebSocket 端點時,onMessage() 方法將被調(diào)用,可以在該方法中處理接收到的消息。

4.@OnClose 注解:@OnClose 注解用于標記一個方法,在客戶端與 WebSocket 端點斷開連接時執(zhí)行。該方法可以包含一個 Session 參數(shù),用于表示客戶端的 WebSocket 會話。示例代碼如下:

java
Copy code
@ServerEndpoint("/websocket")
public class MyWebSocketEndpoint {

    @OnClose
    public void onClose(Session session) {
        // 處理連接關(guān)閉后的邏輯
    }

}

在上述示例中,當客戶端與 WebSocket 端點斷開連接時,onClose() 方法將被調(diào)用,可以在該方法中執(zhí)行連接關(guān)閉后的邏輯處理。

通過使用 @ServerEndpoint 注解和相應(yīng)的 WebSocket 處理方法,您可以創(chuàng)建一個基于 Spring WebSocket 的 WebSocket 端點,并處理與客戶端的連接、消息發(fā)送和連接斷開等操作。

三、需要注意的地方
1.有個地方需要注意,使用@ServerEndpoint注解后,這個Controller還需要用@Component注解嗎?
當使用@ServerEndpoint("/websocket")注解標記一個類作為WebSocket端點時,該類已經(jīng)被隱式地視為一個Spring組件,不需要額外添加@Component注解。

@ServerEndpoint注解本身具有@Component注解的語義,因此被標記為WebSocket端點的類將被Spring自動掃描和管理,無需顯式添加@Component注解。

因此,您只需使用@ServerEndpoint("/websocket")注解來標記WebSocket端點類,無需添加其他Spring組件相關(guān)的注解。Spring將會自動識別和管理WebSocket端點。

可是我在項目中去掉在Controller中@Component注解,用postman測試就404了,wscat也不好使,這是一個讓我疑惑的地方,大家有好的實踐結(jié)果也請評論一下,大家一塊探討一下。

四、實踐和應(yīng)用:通過學(xué)到WebSocket內(nèi)容結(jié)合實際的項目開發(fā)遇到的問題以及解決方案。
問題一:在SpringBoot中使用WebSocket,類中無法通過@Resource和@Autowired注解,將Spring中的Bean注入,因為WebSocket容器和 Spring 容器是兩個獨立的容器,各自負責不同的任務(wù)。WebSocket 容器負責管理 WebSocket端點、處理WebSocket連接等 WebSocket 相關(guān)的功能。Spring 容器負責管理Spring 管理的 Bean、依賴注入、AOP等 Spring 框架相關(guān)的功能
為了實現(xiàn) WebSocket端點與Spring 容器的整合,Spring 提供了·ApplicationContextAware接口,并在 WebSocket 容器初始化時與 Spring 容器進行協(xié)作。具體流程如下:
1.WebSocket 容器檢測到需要實例化的“@ServerEndpoint’類,并進行實例化
2.如果該“@ServerEndpoint'類實現(xiàn)了·ApplicationContextAware'接口,WebSocket容器會通過反射機制調(diào)用其setApplicationContext() 方法
3.WebSocket容器會將Spring 的應(yīng)用程序上下文對象傳遞給·setApplicationContext()方法,以供@ServerEndpoint’類使用。這種協(xié)議的實現(xiàn)使得在“@ServerEndpoint'類中可以訪問Spring 的應(yīng)用程序上下文,以便獲取Spring 管理的 Bean、進行依賴注入等操作,需要注意的是當項目啟動后會進入到setApplicationContext()方法中,之后每次訪問WebSocket的接口,均不在進入setApplicationContext()方法,所以ApplicationContext需要用static修飾。
問題二:在聯(lián)調(diào)項目的時候,前端無法通過Hear傳遞Token,結(jié)合項目排查和測試,確實不能在Hear中傳遞Token。只能將Token當作參數(shù)傳遞到接口中,通過Token,反向獲取Shiro中的用戶信息,完成校驗數(shù)據(jù)。
問題三:在本地聯(lián)調(diào)完畢后,部署測試環(huán)境,發(fā)現(xiàn)用WS方式請求接口,前端拋出了This request has beenblocked; this endpoint must be available over WSS.的異常。WS是不安全的方式,希望通過WSS的方式請求接口。要將WebSocket(@ServerEndpoint)改為使用WSS(WebSocket over SSL / TLS),需要執(zhí)行以下步驟:
1.獲取SSL證書:您需要獲得一個有效的SSL證書。通常,您可以從證書頒發(fā)機構(gòu)(Certificate Authority,CA)獲取SSL證書。
2.配置服務(wù)器:在服務(wù)器上,您需要配置SSL連接以使用您的SSL證書。這涉及到配置服務(wù)器軟件(如Nginx、Apache等)來處理SSL連接。

五、代碼邏輯,項目中有Shiro框架
1.pom

      <!-- Spring Boot Websocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2.Application

  @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

3.Controller

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.log4j.Log4j2;

import org.apache.shiro.session.mgt.SimpleSession;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.SerializationUtils;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPubSub;

import javax.websocket.OnClose;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Log4j2
@Component
@ServerEndpoint(value = "/data/ail/{id}/{did}/{token}")
public class TailDetailController implements ApplicationContextAware {
    private static final int ONLINE_COUNT_MAX = 5;
    private static final AtomicInteger ONLINE_COUNT = new AtomicInteger();

    private JedisCluster jedis;

    private JdbcTemplate jdbcDsp;

    private JedisPubSub sub;


    //項目啟動的時候能進這個setApplicationContext中不是null,當訪問這個url請求接口的時候就是null,只能把他定義成static
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        DebugDeviceTailDetailController.applicationContext = applicationContext;

    }

    /**
     * 連接建立成功調(diào)用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("did") String did, @PathParam("id") Long id, @PathParam("token") String token) throws IOException {
        if (jdbcDsp == null) {
            jdbcDsp = applicationContext.getBean("jdbcDsp", JdbcTemplate.class);
        }
        if (jedis == null) {
            jedis = applicationContext.getBean("jedis", JedisCluster.class);
        }
       //shiro 存在reidis的中key固定前綴,源碼有,下邊是根據(jù)token從reids取出用戶信息
        token = "shiro:session:" + token;
        byte[] sessionBytes = jedis.get(token.getBytes());
        SimpleSession sessions = null;
        if (sessionBytes != null) {
            sessions = (SimpleSession) SerializationUtils.deserialize(sessionBytes);
        }
        if (sessions == null) {
            session.close();
            return;
        }
        //org.apache.shiro.subject.support.DefaultSubjectContext_PRINCIPALS_SESSION_KEY
        Object attribute = sessions.getAttribute("org.apache.shiro.subject.support.DefaultSubjectContext_PRINCIPALS_SESSION_KEY");
        if (attribute == null) {
            session.close();
            return;
        }
        JSONObject jsonObject = JSONObject.parseObject(attribute.toString());
        if (jsonObject == null) {
            session.close();
            return;
        }
        User user = JSONObject.parseObject(JSON.toJSONString(jsonObject), User.class);
        log.info(JSON.toJSONString(user));

        if (dsUmUser == null) {
            session.close();
            throw new IllegalStateException("please login first");
        }

      //這里是給前端打一個最大限制信息
     /*   if (ONLINE_COUNT.incrementAndGet() > ONLINE_COUNT_MAX) {
            try {
                session.getBasicRemote().sendText("max connection limited");
            } catch (IOException e) {
                log.debug("DeviceDebug tail error {}", e.toString());
            }
        }*/

        sub = new JedisPubSub() {
            @Override
            public void onMessage(String channel, String message) {
                log.info("Received message: {}, on channel: {}", message, channel);
                try {
                    session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    log.error("send error", e);
                }
            }
        };
        //當colse后這個線程會停止
        new Thread(() -> {
            String key = String.format("aa:%d:bb:%s", id, did);
            //訂閱
            jedis.subscribe(sub, key);
        }, "SubscribeDetailThread").start();

    }

    @OnClose
    public void onClose() {
        if (sub != null) {
            sub.unsubscribe();
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容