springboot整合Netty應(yīng)用

什么是Netty

Netty是一個異步事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用程序框架,用于快速開發(fā)可維護(hù)的高性能協(xié)議服務(wù)器和客戶端,Netty是一個NIO客戶端服務(wù)器框架,可以快速輕松地開發(fā)網(wǎng)絡(luò)應(yīng)用程序,例如協(xié)議服務(wù)器和客戶端。它極大地簡化和簡化了諸如TCP和UDP套接字服務(wù)器之類的網(wǎng)絡(luò)編程。

“快速簡便”并不意味著最終的應(yīng)用程序?qū)⒃馐芸删S護(hù)性或性能問題的困擾。Netty經(jīng)過精心設(shè)計,結(jié)合了許多協(xié)議(例如FTP,SMTP,HTTP以及各種基于二進(jìn)制和文本的舊式協(xié)議)的實施經(jīng)驗。結(jié)果,Netty成功地找到了一種無需妥協(xié)即可輕松實現(xiàn)開發(fā),性能,穩(wěn)定性和靈活性的方法。

特性

  • 高性能 事件驅(qū)動
  • 異步非堵塞 基于NIO的客戶端,服務(wù)器端編程框架
  • 穩(wěn)定性和伸縮性
  • 適用于各種傳輸類型的統(tǒng)一API-阻塞和非阻塞套接字
  • 基于靈活且可擴(kuò)展的事件模型,可將關(guān)注點(diǎn)明確分離
  • 高度可定制的線程模型-單線程,一個活多個線程池 ,例如SEDA
  • 真正的無連接數(shù)據(jù)報套接字支持 (從3.1開始)

表現(xiàn)

  • 更高的吞吐量
  • 減少資源消耗
  • 減少不必要的內(nèi)存復(fù)制

使用

在pom.xml中添加依賴

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.42.Final</version>
        </dependency>

一丶Springboot應(yīng)用啟動加載Netty應(yīng)用

暴露Netty端口,隨springboot應(yīng)用一并啟動

/**
 * @dete: 2021/4/21 9:08 上午
 * @author: 徐子木
 */
@SpringBootApplication
public class PmSocketApplication implements CommandLineRunner {
    
// yml中指定netty端口號
    @Value("${netty.port}")
    private int nettyServerPort;

    @Autowired
    private NettyWebSocketServer nettyServer;

    public static void main(String[] args) {
        SpringApplication.run(PmSocketApplication.class, args);
    }


    @Override
    public void run(String... args) throws Exception {
        //netty 服務(wù)端啟動的端口不可和Springboot啟動類的端口號重復(fù)
        nettyServer.start(nettyServerPort);

        //關(guān)閉服務(wù)器的時候同時關(guān)閉Netty服務(wù)
         Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.destroy()));
    }
}

二丶Netty整合WebSocket服務(wù)端

啟動或銷毀netty服務(wù)端的過程實現(xiàn)

/**
 * Netty整合websocket 服務(wù)端
 * 運(yùn)行流程:
 * 1.創(chuàng)建一個ServerBootstrap的實例引導(dǎo)和綁定服務(wù)器
 * 2.創(chuàng)建并分配一個NioEventLoopGroup實例以進(jìn)行事件的處理,比如接收連接和讀寫數(shù)據(jù)
 * 3.指定服務(wù)器綁定的本地的InetSocketAddress
 * 4.使用一個NettyServerHandler的實例初始化每一個新的Channel
 * 5.調(diào)用ServerBootstrap.bind()方法以綁定服務(wù)器
 *
 * @description
 * @author: 徐子木
 * @create: 2020-06-02 14:23
 **/
@Component
@Slf4j
public class NettyWebSocketServer {


    /**
     * EventLoop接口
     * NioEventLoop中維護(hù)了一個線程和任務(wù)隊列,支持異步提交任務(wù),線程啟動時會調(diào)用NioEventLoop的run方法,執(zhí)行I/O任務(wù)和非I/O任務(wù)
     * I/O任務(wù)即selectionKey中的ready的事件,如accept,connect,read,write等,由processSelectedKeys方法觸發(fā)
     * 非I/O任務(wù)添加到taskQueue中的任務(wù),如register0,bind0等任務(wù),由runAllTasks方法觸發(fā)
     * 兩種任務(wù)的執(zhí)行時間比由變量ioRatio控制,默認(rèn)為50,則表示允許非IO任務(wù)執(zhí)行的事件與IO任務(wù)的執(zhí)行時間相等
     */
    private final EventLoopGroup boosGroup = new NioEventLoopGroup();

    private final EventLoopGroup workGroup = new NioEventLoopGroup();

    /**
     * Channel
     * Channel類似Socket,它代表一個實體(如一個硬件設(shè)備,一個網(wǎng)絡(luò)套接字) 的開放連接,如讀寫操作.通俗的講,Channel字面意思就是通道,每一個客戶端與服務(wù)端之間進(jìn)行通信的一個雙向通道.
     * Channel主要工作:
     * 1.當(dāng)前網(wǎng)絡(luò)連接的通道的狀態(tài)(例如是否打開?是否已連接?)
     * 2.網(wǎng)絡(luò)連接的配置參數(shù)(例如接收緩沖區(qū)的大小)
     * 3.提供異步的網(wǎng)絡(luò)I/O操作(如建立連接,讀寫,綁定端口),異步調(diào)用意味著任何I/O調(diào)用都將立即返回,并且不保證在調(diào)用結(jié)束時鎖清秋的I/O操作已完成.
     * 調(diào)用立即放回一個ChannelFuture實例,通過注冊監(jiān)聽器到ChannelFuture上,可以I/O操作成功,失敗或取消時回調(diào)通知調(diào)用方.
     * 4.支持關(guān)聯(lián)I/O操作與對應(yīng)的處理程序.
     * 不同協(xié)議,不同的阻塞類型的連接都有不同的Channel類型與之對應(yīng),下面是一些常用的Channel類型
     * NioSocketChannel,異步的客戶端 TCP Socket連接
     * NioServerSocketChannel,異步的服務(wù)端 TCP Socket 連接
     * NioDatagramChannel,異步的UDP連接
     * NioSctpChannel,異步的客戶端Sctp連接
     * NioSctoServerChannel,異步的Sctp服務(wù)端連接
     * 這些通道涵蓋了UDP 和TCP網(wǎng)絡(luò)IO以及文件IO
     */
    private Channel channel;


    /**
     * 啟動服務(wù)
     *
     * @param port
     */
    public void start(int port) {
        log.info("=================Netty 端口啟動:{}==================",port);

        /**
         * Future
         * Future提供了另外一種在操作完成時通知應(yīng)用程序的方式,這個對象可以看做一個異步操作的結(jié)果占位符.
         * 通俗的講,它相當(dāng)于一位指揮官,發(fā)送了一個請求建立完連接,通信完畢了,你通知一聲它回來關(guān)閉各種IO通道,整個過程,它是不阻塞的,異步的.
         * 在Netty中所有的IO操作都是異步的,不能理科的值消息是否被正確處理,但是可以過一會兒等他執(zhí)行完成或者直接注冊一個監(jiān)聽,具體的實現(xiàn)就是通過Future和ChannelFutures.
         * 他們可以注冊一個監(jiān)聽,當(dāng)操作執(zhí)行成功成功或者失敗時監(jiān)聽會自動觸發(fā)注冊的監(jiān)聽事件
         */
        try {
            /**
             * Bootstrap
             * Bootstrap是引導(dǎo)的意思,一個Netty應(yīng)用通常由一個Bootstrap開始
             * 主要作用是配置整個Netty程序,串聯(lián)各個組件
             * Netty中Bootstrap類是服務(wù)端啟動引導(dǎo)類
             */
            ServerBootstrap server = new ServerBootstrap();
            server.group(boosGroup, workGroup)
                    //非阻塞異步服務(wù)端TCP Socket 連接
                    .channel(NioServerSocketChannel.class)
                    //設(shè)置為前端WebSocket可以連接
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // HttpServerCodec: 將請求和映帶消息節(jié)嗎為HTTP消息
                            pipeline.addLast("http-codec", new HttpServerCodec());
                            // 講HTTP消息的多個部分合成一條完整的HTTP消息
                            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                            // 向客戶端發(fā)送HTML5文件
                            socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                            // 進(jìn)行設(shè)置心跳檢測
                            socketChannel.pipeline().addLast(new IdleStateHandler(60, 30, 60 * 30, TimeUnit.SECONDS));
                            // 配置通道處理 來進(jìn)行業(yè)務(wù)處理
                            pipeline.addLast("handler", new WebSocketServerHandler());
                        }

                    });
            channel = server.bind(port).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void destroy() {
        log.info("=================Netty服務(wù)關(guān)閉==================");
        if (channel != null) {
            channel.close();
        }
        boosGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }


}

三丶Socket處理注冊連接類

接收客戶端連接信息,心跳檢測,存儲于銷毀等的處理

其余部分業(yè)務(wù)類(messageService),均為自行處理連接存儲為業(yè)務(wù)所用,請忽略

/**
 * @description
 * @author: 徐子木
 * @create: 2020-06-02 14:57
 **/
@Slf4j
@Component
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

    public static final byte PING_MSG = 1;
    public static final byte PONG_MSG = 2;
    public static final byte CUSTOM_MSG = 3;
    private int heartbeatCount = 0;

  // 配置客戶端是否為https的控制
    @Value("${netty.ssl-enabled:false}")
    private Boolean useSsl;

    /**
     * 這里可以引入自己業(yè)務(wù)類來處理進(jìn)行的客戶端連接
     */
    @Autowired
    private MessageService messageService;
    
    public static WebSocketServerHandler webSocketServerHandler;

    /**
     * 解決啟動加載不到自己業(yè)務(wù)類
     */
    @PostConstruct
    public void init() {
        webSocketServerHandler = this;
    }


    private WebSocketServerHandshaker handshaker;


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //http請求和tcp請求分開處理
        if (msg instanceof HttpRequest) {
            handlerHttpRequest(ctx, (HttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            //踩坑: simpleChannelInboundHandler 他會進(jìn)行一次釋放(引用計數(shù)器減一),參考源碼,而我們釋放的時候就變?yōu)榱?,所以必須手動進(jìn)行引用計數(shù)器加1
            WebSocketFrame frame = (WebSocketFrame) msg;
            frame.retain();
            handlerWebSocketFrame(ctx, frame);
        }

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    /**
     * WebSocket 消息處理
     *
     * @param ctx
     * @param frame
     */
    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        //判斷是否是關(guān)閉鏈路的指令
        if (frame instanceof CloseWebSocketFrame) {
            log.info("【" + ctx.channel().remoteAddress() + "】已關(guān)閉(服務(wù)器端)");
            //移除channel
            NioSocketChannel channel = (NioSocketChannel) ctx.channel();
            webSocketServerHandler.messageService.removeConnection(channel);
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
            return;
        }
        //判斷是否是ping消息
        if (frame instanceof PingWebSocketFrame) {
            log.info("【ping】");
            return;
        }
        //判斷實時是pong消息
        if (frame instanceof PongWebSocketFrame) {
            log.info("【pong】");
            return;
        }
        //本例子只支持文本,不支持二進(jìn)制
        if (!(frame instanceof TextWebSocketFrame)) {
            log.info("【不支持二進(jìn)制】");
            throw new UnsupportedOperationException("不支持二進(jìn)制");
        }

        // 傳送的消息 ,接收客戶端指定格式(自己與客戶端約定json格式)的消息,并進(jìn)行處理
        MessageObject messageObject = JSONObject.parseObject(((TextWebSocketFrame) frame).text().toString(), MessageObject.class);
        webSocketServerHandler.messageService.sendMessage(messageObject, ctx);
    }

    /**
     * websocket第一次連接握手
     *
     * @param ctx
     */
    @SuppressWarnings("deprecation")
    private void handlerHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
// 這里接收客戶端附加連接參數(shù),根據(jù)自己業(yè)務(wù)與客戶端指定需要哪些參數(shù)來辨別連接唯一性
        String userUid = null;
        String sectionId = null;
        if ("GET".equalsIgnoreCase(req.getMethod().toString())) {
            String uri = req.getUri();
            userUid = req.getUri().substring(uri.indexOf("/", 2) + 1, uri.lastIndexOf("/"));
            sectionId = req.getUri().substring(uri.lastIndexOf("/") + 1);
            //對用戶信息進(jìn)行存儲
            NioSocketChannel channel = (NioSocketChannel) ctx.channel();

            webSocketServerHandler.messageService.putConnection(userUid, sectionId, channel);
        }

        // http 解碼失敗
        if (!req.getDecoderResult().isSuccess() || (!"websocket".equalsIgnoreCase(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, (FullHttpRequest) req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
        }
        //可以通過url獲取其他參數(shù)
        WebSocketServerHandshakerFactory factory;
// 這里主要用于 客戶端為wss連接的處理
        if (useSsl != null && useSsl) {
            factory = new WebSocketServerHandshakerFactory(
                    "wss://" + req.headers().get("Host") + "/" + req.getUri() + "", null, false
            );
        } else {
            factory = new WebSocketServerHandshakerFactory(
                    "ws://" + req.headers().get("Host") + "/" + req.getUri() + "", null, false
            );
        }
        handshaker = factory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
            //進(jìn)行連接
            handshaker.handshake(ctx.channel(), (FullHttpRequest) req);
        }
    }

    @SuppressWarnings("deprecation")
    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
        // 返回應(yīng)答給客戶端
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);

            // buf.release();
        }
        // 如果是非Keep-Alive,關(guān)閉連接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }


    /**
     * 這里是保持服務(wù)器與客戶端長連接  進(jìn)行心跳檢測 避免連接斷開
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent stateEvent = (IdleStateEvent) evt;
            PingWebSocketFrame ping = new PingWebSocketFrame();
            switch (stateEvent.state()) {
                //讀空閑(服務(wù)器端)
                case READER_IDLE:
                    //log.info("【" + ctx.channel().remoteAddress() + "】讀空閑(服務(wù)器端)");
                    ctx.writeAndFlush(ping);
                    break;
                //寫空閑(客戶端)
                case WRITER_IDLE:
                    //log.info("【" + ctx.channel().remoteAddress() + "】寫空閑(客戶端)");
                    ctx.writeAndFlush(ping);
                    break;
                case ALL_IDLE:
                    //log.info("【" + ctx.channel().remoteAddress() + "】讀寫空閑");
                    break;
                default:
                    break;
            }
        }
    }


    /**
     * 出現(xiàn)異常時
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        //移除channel
        webSocketServerHandler.messageService.removeConnection((NioSocketChannel) ctx.channel());
        ctx.close();
        log.info("【" + ctx.channel().remoteAddress() + "】已關(guān)閉(服務(wù)器端)");
    }


}

四丶與業(yè)務(wù)整合并存儲客戶連接

以下均為自己業(yè)務(wù)與socket連接對象的存儲,發(fā)送,踢出等處理,可根據(jù)自己業(yè)務(wù)自行參考

當(dāng)時公司業(yè)務(wù)需要將連接存儲兩級維度,可根據(jù)自己的業(yè)務(wù)給客戶端分組,或不分組都可以

/**
 * @description
 * @author: 徐子木
 * @create: 2020-09-30 15:06
 **/
@Slf4j
@Service(value = "messageService")
public class MessageServiceImpl implements MessageService {

// 是消息通信dubbo類,請忽略
    @Autowired
    private ChatMsgService chatMsgService;
// 是db存儲類 ,請忽略
    @Autowired
    private BidPresentDao bidPresentDao;

// 這里是netty可為指定的唯一key去與連接進(jìn)行分組處理并存儲
    private HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();

    private final AttributeKey<String> userKey = AttributeKey.valueOf("user");

    private final AttributeKey<String> sectionKey = AttributeKey.valueOf("section");

    /**
     * 裝載標(biāo)段與對應(yīng)在線的用戶
     */
    private static final Map<String, ChannelGroup> SECTION_GROUPS = new ConcurrentHashMap<>();


    /**
     * 維護(hù)某標(biāo)段中的socket連接
     *
     * @param sectionId
     * @param channel
     */
    @Override
    public void putConnection(String userId, String sectionId, NioSocketChannel channel) {

        channel.attr(userKey).set(userId);
        channel.attr(sectionKey).set(sectionId);

        bidPresentDao.comeOnlineByUserId(userId, sectionId);

        //存儲用戶標(biāo)段對應(yīng)連接
        ChannelGroup channelGroup = SECTION_GROUPS.get(sectionId);
        if (null == channelGroup) {
            //保存全局的,連接上的服務(wù)器的客戶
            channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
            channelGroup.add(channel);
            SECTION_GROUPS.put(sectionId, channelGroup);
        } else {
            channelGroup.add(channel);
        }
    }

    /**
     * 判斷一個通道是否有用戶在使用
     *
     * @param channel
     * @return
     */
    private boolean hasUser(Channel channel) {
        return ((channel.hasAttr(userKey) || channel.hasAttr(sectionKey)));
    }

    /**
     * 獲取連接對應(yīng)用戶
     *
     * @param channel
     * @return
     */
    @Override
    public String getBindUserId(NioSocketChannel channel) {
        if (hasUser(channel)) {
            return channel.attr(userKey).get();
        }
        return null;
    }

    /**
     * 獲取連接對應(yīng)標(biāo)段Id
     *
     * @param channel
     * @return
     */
    @Override
    public String getBindSectionId(NioSocketChannel channel) {
        if (hasUser(channel)) {
            return channel.attr(sectionKey).get();
        }
        return null;
    }


    /**
     * 用戶退出標(biāo)段在線連接
     *
     * @param channel
     */
    @Override
    public void removeConnection(NioSocketChannel channel) {
        String userId = getBindUserId(channel);
        String sectionId = getBindSectionId(channel);

        Iterator<Map.Entry<String, ChannelGroup>> iterator = SECTION_GROUPS.entrySet().iterator();
        while (iterator.hasNext()) {
            ChannelGroup channelGroup = iterator.next().getValue();
            if (channelGroup.contains(channel)) {
                channelGroup.remove(channel);
            }
            if (null == channelGroup || channelGroup.size() == 0) {
                iterator.remove();
            }
        }
        if (StringUtils.isNotEmpty(userId) && StringUtils.isNotEmpty(sectionId)) {
            bidPresentDao.exitOnlineByUserId(userId, sectionId);
        }
    }

    /**
     * 根據(jù)用戶Id獲取連接
     *
     * @param userId
     * @return
     */
    private NioSocketChannel getChannel(String userId) {
        Iterator<Map.Entry<String, ChannelGroup>> iterator = SECTION_GROUPS.entrySet().iterator();
        while (iterator.hasNext()) {
            ChannelGroup channelGroup = iterator.next().getValue();
            for (Channel channel : channelGroup) {
                if (userId.equalsIgnoreCase(channel.attr(userKey).get())) {
                    return (NioSocketChannel) channel;
                }
            }
        }
        return null;
    }

    /**
     * 發(fā)送純狀態(tài)碼的消息
     *
     * @param toUserId
     * @param message
     */
    @Override
    public void sendMessage(String toUserId, String message) {
        NioSocketChannel channel = getChannel(toUserId);
        if (channel != null) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("message", message);
            channel.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(jsonObject)));
        }
    }

    /**
     * 向指定投標(biāo)人發(fā)送狀態(tài)
     *
     * @param toUserId
     */
    @Override
    public void sendMessage(String toUserId, MessageEnum messageEnum) {
        NioSocketChannel channel = getChannel(toUserId);
        if (channel != null) {
            MessageObject messageObject = MessageObject.builder().
                    code(messageEnum.getCode())
                    .build();
            channel.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(messageObject)));
        }
    }

    /**
     * 向當(dāng)前標(biāo)段所有投標(biāo)人發(fā)送消息
     *
     * @param sectionId
     */
    @Override
    public void sendMessageAll(String sectionId, MessageEnum messageEnum) {
        log.debug("標(biāo)段Id: {},發(fā)送狀態(tài)碼: {}", sectionId, messageEnum);
        MessageObject messageObject = MessageObject.builder()
                .code(messageEnum.getCode())
                .build();
        sendMessageAll(sectionId, messageObject);
    }

    @Override
    public void sendMessageAll(String sectionId, MessageObject messageObject) {
        ChannelGroup channelGroup = SECTION_GROUPS.get(sectionId);
        if (channelGroup == null || channelGroup.size() == 0) {
            log.warn("暫時無客戶端在線 sectionId:{}", sectionId);
            return;
        }
        channelGroup.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(messageObject)));
    }


    /**
     * 根據(jù)狀態(tài)碼處理消息
     *
     * @param message
     */
    @Override
    public void sendMessage(MessageObject message, ChannelHandlerContext ctx) {
        MessageEnum messageEnum = MessageEnum.valuesOf(message.getCode());
        switch (messageEnum) {
            case CHAT_SEND_MESSAGE:
                ChatMsg msg = JSONObject.parseObject(message.getData().toString(), ChatMsg.class);
                flushSectionChat(msg, ctx);
                break;
            case HEART_CONNECT:
                //心跳連接
                break;
            case REDIRECT:
                RedirectEntity redirectEntity = JSONObject.parseObject(message.getData().toString(), RedirectEntity.class);
                flushSectionToObject(redirectEntity.getSectionId(), redirectEntity);
                break;
            case DECODE:
                String sectionId = JSONObject.parseObject(message.getData().toString(), String.class);
                sendMessageAll(sectionId, MessageEnum.DECODE);
                break;
            default:

                break;
        }

    }

    /**
     * 向該標(biāo)段中發(fā)送系統(tǒng)消息
     *
     * @param sectionId
     * @param msg
     */
    @Override
    public void flushSectionSystem(String sectionId, String msg) {
        ChatMsg chatMsg = ChatMsg.builder()
                .sectionId(sectionId)
                .msgType(ChatMsgType.NOTICE.getCode())
                .content(msg)
                .build();
        flushSectionChat(chatMsg, null);
    }

    /**
     * 向當(dāng)前標(biāo)段的在線人員刷新一條消息
     *
     * @param chatMsg
     */
    private void flushSectionChat(ChatMsg chatMsg, ChannelHandlerContext ctx) {
        if (ObjectUtil.isNotEmpty(ctx)) {
            InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
            String ip = inetSocketAddress.getAddress().getHostAddress();
            chatMsg.setIp(ip);
        }

        String sectionId = chatMsg.getSectionId();
        MessageObject messageObject = MessageObject.builder()
                .code(MessageEnum.CHAT_SEND_MESSAGE.getCode())
                .data(chatMsg)
                .build();
        sendMessageAll(sectionId, messageObject);
        chatMsgService.create(chatMsg);
    }


    /**
     * 當(dāng)前標(biāo)段在線人員發(fā)送自定義數(shù)據(jù)
     *
     * @param sectionId
     * @param object
     */
    private void flushSectionToObject(String sectionId, Object object) {
        MessageObject messageObject = MessageObject.builder()
                .code(MessageEnum.REDIRECT.getCode())
                .data(object)
                .build();

        sendMessageAll(sectionId, messageObject);
    }


    /**
     * 向指定用戶發(fā)送自定義數(shù)據(jù)
     *
     * @param toUserId
     */
    @Override
    public void sendMessage(String toUserId, Object object, MessageEnum messageEnum) {
        NioSocketChannel channel = getChannel(toUserId);
        if (channel != null) {
            MessageObject messageObject = MessageObject.builder().
                    code(messageEnum.getCode())
                    .data(object)
                    .build();
            if(MessageEnum.EXTRACT_PARAM_RESULT.equals(messageEnum)){
                log.debug("參數(shù)抽取發(fā)送內(nèi)容:{}",JSONObject.toJSONString(messageObject));
            }
            channel.writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(messageObject)));
        }
    }


}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容