一文詳解 | Java 中 NIO基礎(chǔ)詳解

Netty 是基于Java NIO 封裝的網(wǎng)絡(luò)通訊框架,只有充分理解了 Java NIO 才能理解好Netty的底層設(shè)計(jì)。Java NIO 由三個(gè)核心組件組件:

  • Buffer
  • Channel
  • Selector

緩沖區(qū) Buffer

Buffer 是一個(gè)數(shù)據(jù)對象,我們可以把它理解為固定數(shù)量的數(shù)據(jù)的容器,它包含一些要寫入或者讀出的數(shù)據(jù)。

在 Java NIO 中,任何時(shí)候訪問 NIO 中的數(shù)據(jù),都需要通過緩沖區(qū)(Buffer)進(jìn)行操作。讀取數(shù)據(jù)時(shí),直接從緩沖區(qū)中讀取,寫入數(shù)據(jù)時(shí),寫入至緩沖區(qū)。NIO 最常用的緩沖區(qū)則是 ByteBuffer。下圖是 Buffer 繼承關(guān)系圖:

image

每一個(gè) Java 基本類型都對應(yīng)著一種 Buffer,他們都包含這相同的操作,只不過是所處理的數(shù)據(jù)類型不同而已。

通道 Channel

Channel 是一個(gè)通道,它就像自來水管一樣,網(wǎng)絡(luò)數(shù)據(jù)通過 Channel 這根水管讀取和寫入。傳統(tǒng)的 IO 是基于流進(jìn)行操作的,Channle 和類似,但又有些不同:

image.png

正如上面說到的,Channel 必須要配合 Buffer 一起使用,我們永遠(yuǎn)不可能將數(shù)據(jù)直接寫入到 Channel 中,同樣也不可能直接從 Channel 中讀取數(shù)據(jù)。都是通過從 Channel 讀取數(shù)據(jù)到 Buffer 中或者從 Buffer 寫入數(shù)據(jù)到 Channel 中,如下:

image

簡單點(diǎn)說,Channel 是數(shù)據(jù)的源頭或者數(shù)據(jù)的目的地,用于向 buffer 提供數(shù)據(jù)或者讀取 buffer 數(shù)據(jù),并且對 I/O 提供異步支持。

下圖是 Channel 的類圖

image.png

Channel 為最頂層接口,所有子 Channel 都實(shí)現(xiàn)了該接口,它主要用于 I/O 操作的連接。定義如下:

public interface Channel extends Closeable {

    /**

     * 判斷此通道是否處于打開狀態(tài)。 

     */

    public boolean isOpen();

    /**

     *關(guān)閉此通道。

     */

    public void close() throws IOException;
}

最為重要的Channel實(shí)現(xiàn)類為:

  • FileChannel:一個(gè)用來寫、讀、映射和操作文件的通道
  • DatagramChannel:能通過 UDP 讀寫網(wǎng)絡(luò)中的數(shù)據(jù)
  • SocketChannel: 能通過 TCP 讀寫網(wǎng)絡(luò)中的數(shù)據(jù)
  • ServerSocketChannel:可以監(jiān)聽新進(jìn)來的 TCP 連接,像 Web 服務(wù)器那樣。對每一個(gè)新進(jìn)來的連接都會(huì)創(chuàng)建一個(gè) SocketChannel

多路復(fù)用器 Selector

多路復(fù)用器 Selector,它是 Java NIO 編程的基礎(chǔ),它提供了選擇已經(jīng)就緒的任務(wù)的能力。從底層來看,Selector 提供了詢問通道是否已經(jīng)準(zhǔn)備好執(zhí)行每個(gè) I/O 操作的能力。簡單來講,Selector 會(huì)不斷地輪詢注冊在其上的 Channel,如果某個(gè) Channel 上面發(fā)生了讀或者寫事件,這個(gè) Channel 就處于就緒狀態(tài),會(huì)被 Selector 輪詢出來,然后通過 SelectionKey 可以獲取就緒 Channel 的集合,進(jìn)行后續(xù)的 I/O 操作。

Selector 允許一個(gè)線程處理多個(gè) Channel ,也就是說只要一個(gè)線程復(fù)雜 Selector 的輪詢,就可以處理成千上萬個(gè) Channel ,相比于多線程來處理勢必會(huì)減少線程的上下文切換問題。下圖是一個(gè) Selector 連接三個(gè) Channel :

image

實(shí)例

服務(wù)端

public class NIOServer {
    /*接受數(shù)據(jù)緩沖區(qū)*/

    private ByteBuffer sendbuffer = ByteBuffer.allocate(1024);

    /*發(fā)送數(shù)據(jù)緩沖區(qū)*/

    private  ByteBuffer receivebuffer = ByteBuffer.allocate(1024);

    private Selector selector;

    public NIOServer(int port) throws IOException {

        // 打開服務(wù)器套接字通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        // 服務(wù)器配置為非阻塞
        serverSocketChannel.configureBlocking(false);

        // 檢索與此通道關(guān)聯(lián)的服務(wù)器套接字
        ServerSocket serverSocket = serverSocketChannel.socket();

        // 進(jìn)行服務(wù)的綁定
        serverSocket.bind(new InetSocketAddress(port));

        // 通過open()方法找到Selector
        selector = Selector.open();

        // 注冊到selector,等待連接
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server Start----:");

    }

    private void listen() throws IOException {

        while (true) {

            selector.select();

            Set<SelectionKey> selectionKeys = selector.selectedKeys();

            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            while (iterator.hasNext()) {

                SelectionKey selectionKey = iterator.next();

                iterator.remove();

                handleKey(selectionKey);

            }

        }

    }

    private void handleKey(SelectionKey selectionKey) throws IOException {

        // 接受請求
        ServerSocketChannel server = null;

        SocketChannel client = null;

        String receiveText;

        String sendText;

        int count=0;

        // 測試此鍵的通道是否已準(zhǔn)備好接受新的套接字連接。
        if (selectionKey.isAcceptable()) {

            // 返回為之創(chuàng)建此鍵的通道。
            server = (ServerSocketChannel) selectionKey.channel();

            // 接受到此通道套接字的連接。
            // 此方法返回的套接字通道(如果有)將處于阻塞模式。
            client = server.accept();

            // 配置為非阻塞
            client.configureBlocking(false);

            // 注冊到selector,等待連接
            client.register(selector, SelectionKey.OP_READ);

        } else if (selectionKey.isReadable()) {

            // 返回為之創(chuàng)建此鍵的通道。
            client = (SocketChannel) selectionKey.channel();

            //將緩沖區(qū)清空以備下次讀取
            receivebuffer.clear();

            //讀取服務(wù)器發(fā)送來的數(shù)據(jù)到緩沖區(qū)中
            count = client.read(receivebuffer);

            if (count > 0) {

                receiveText = new String( receivebuffer.array(),0,count);

                System.out.println("服務(wù)器端接受客戶端數(shù)據(jù)--:"+receiveText);

                client.register(selector, SelectionKey.OP_WRITE);

            }

        } else if (selectionKey.isWritable()) {

            //將緩沖區(qū)清空以備下次寫入
            sendbuffer.clear();

            // 返回為之創(chuàng)建此鍵的通道。
            client = (SocketChannel) selectionKey.channel();

            sendText="message from server--";

            //向緩沖區(qū)中輸入數(shù)據(jù)
            sendbuffer.put(sendText.getBytes());

            //將緩沖區(qū)各標(biāo)志復(fù)位,因?yàn)橄蚶锩鎝ut了數(shù)據(jù)標(biāo)志被改變要想從中讀取數(shù)據(jù)發(fā)向服務(wù)器,就要復(fù)位
            sendbuffer.flip();

            //輸出到通道
            client.write(sendbuffer);

            System.out.println("服務(wù)器端向客戶端發(fā)送數(shù)據(jù)--:"+sendText);

            client.register(selector, SelectionKey.OP_READ);

        }

    }

    public static void main(String[] args) throws IOException {

        int port = 8080;

        NIOServer server = new NIOServer(port);

        server.listen();

    }

}

客戶端

public class NIOClient {
    /*接受數(shù)據(jù)緩沖區(qū)*/
    private static ByteBuffer sendbuffer = ByteBuffer.allocate(1024);

    /*發(fā)送數(shù)據(jù)緩沖區(qū)*/
    private static ByteBuffer receivebuffer = ByteBuffer.allocate(1024);

    public static void main(String[] args) throws IOException {

        // 打開socket通道
        SocketChannel socketChannel = SocketChannel.open();

        // 設(shè)置為非阻塞方式
        socketChannel.configureBlocking(false);

        // 打開選擇器
        Selector selector = Selector.open();

        // 注冊連接服務(wù)端socket動(dòng)作
        socketChannel.register(selector, SelectionKey.OP_CONNECT);

        // 連接

        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));

        Set<SelectionKey> selectionKeys;

        Iterator<SelectionKey> iterator;

        SelectionKey selectionKey;

        SocketChannel client;

        String receiveText;

        String sendText;

        int count=0;

        while (true) {

            //選擇一組鍵,其相應(yīng)的通道已為 I/O 操作準(zhǔn)備就緒。
            //此方法執(zhí)行處于阻塞模式的選擇操作。
            selector.select();

            //返回此選擇器的已選擇鍵集。
            selectionKeys = selector.selectedKeys();

            //System.out.println(selectionKeys.size());
            iterator = selectionKeys.iterator();

            while (iterator.hasNext()) {

                selectionKey = iterator.next();

                if (selectionKey.isConnectable()) {

                    System.out.println("client connect");

                    client = (SocketChannel) selectionKey.channel();

                    // 判斷此通道上是否正在進(jìn)行連接操作。
                    // 完成套接字通道的連接過程。
                    if (client.isConnectionPending()) {

                        client.finishConnect();

                        System.out.println("完成連接!");

                        sendbuffer.clear();

                        sendbuffer.put("Hello,Server".getBytes());

                        sendbuffer.flip();

                        client.write(sendbuffer);

                    }

                    client.register(selector, SelectionKey.OP_READ);

                } else if (selectionKey.isReadable()) {

                    client = (SocketChannel) selectionKey.channel();

                    //將緩沖區(qū)清空以備下次讀取
                    receivebuffer.clear();

                    //讀取服務(wù)器發(fā)送來的數(shù)據(jù)到緩沖區(qū)中
                    count=client.read(receivebuffer);

                    if(count>0){

                        receiveText = new String( receivebuffer.array(),0,count);

                        System.out.println("客戶端接受服務(wù)器端數(shù)據(jù)--:"+receiveText);

                        client.register(selector, SelectionKey.OP_WRITE);

                    }

                } else if (selectionKey.isWritable()) {

                    sendbuffer.clear();

                    client = (SocketChannel) selectionKey.channel();

                    sendText = "message from client--";

                    sendbuffer.put(sendText.getBytes());

                    //將緩沖區(qū)各標(biāo)志復(fù)位,因?yàn)橄蚶锩鎝ut了數(shù)據(jù)標(biāo)志被改變要想從中讀取數(shù)據(jù)發(fā)向服務(wù)器,就要復(fù)位
                    sendbuffer.flip();

                    client.write(sendbuffer);

                    System.out.println("客戶端向服務(wù)器端發(fā)送數(shù)據(jù)--:"+sendText);

                    client.register(selector, SelectionKey.OP_READ);

                }

            }

            selectionKeys.clear();

        }

    }
}

運(yùn)行結(jié)果

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容