Netty理論一:Java IO與NIO

1、Linux IO模型

《Unix網(wǎng)絡(luò)編程》把I/O模型分成五類

  • 阻塞式I/O模型:整個(gè)過程都是阻塞的——BIO(java socket)


    image.png
image.png
  • 非阻塞式I/O模型:只有從內(nèi)核空間復(fù)制數(shù)據(jù)時(shí)才是阻塞的,從磁盤到內(nèi)核空間是非阻塞的——NIO

線程發(fā)起io請(qǐng)求后,立即返回(非阻塞io)。用戶線程不阻塞等待,但是,用戶線程要定時(shí)輪詢檢查數(shù)據(jù)是否就緒,當(dāng)數(shù)據(jù)就緒后,用戶線程將數(shù)據(jù)從用戶空間寫入socket空間,或從socket空間讀取數(shù)據(jù)到用戶空間(同步)。

image.png
image.png
  • I/O復(fù)用:一次監(jiān)控一批通道,看哪個(gè)通道有數(shù)據(jù)就返回哪個(gè)——NIO(Java)

當(dāng)用戶線程發(fā)起io請(qǐng)求后,將socket連接及關(guān)注事件注冊到selector(多路復(fù)用器,os級(jí)別線程)上,selector循環(huán)遍歷socket連接,看是否有關(guān)注數(shù)據(jù)就緒,如果連接有數(shù)據(jù)就緒后,就通知應(yīng)用程序,建立線程進(jìn)行數(shù)據(jù)讀寫。同BIO對(duì)比,NIO中線程處理的都是有效連接(數(shù)據(jù)就緒),且一個(gè)線程可以分管處理多個(gè)連接上的就緒數(shù)據(jù),節(jié)省線程資源開銷

image.png

selector:注冊的socket事件由數(shù)組管理,長度有限制,輪詢查找時(shí)需要遍歷數(shù)組。
netty中創(chuàng)建EventLoopGroup時(shí)可以選擇使用NioEventLoopGroup,NioEventLoopGroup管理的NioEventLoop中使用的就是selector

poll:注冊的socket事件由鏈表實(shí)現(xiàn),數(shù)量沒有限制,遍歷鏈表輪詢查找。

  • 信號(hào)驅(qū)動(dòng)式I/O:從磁盤拷貝到內(nèi)核空間(這個(gè)過程是非阻塞的)以后,信號(hào)通知用戶進(jìn)程,然后從內(nèi)核空間拷貝用戶空間(這個(gè)過程是阻塞的)

    image.png
image.png

epoll:基于事件驅(qū)動(dòng)思想,采用reactor模式,通過事件回調(diào),無需使用某種方式主動(dòng)檢查socket狀態(tài),被動(dòng)接收就緒事件即可。
netty中創(chuàng)建EventLoopGroup時(shí)可以選擇使用EpollEventLoopGroup。

  • 異步I/O:整個(gè)過程都是異步的,當(dāng)數(shù)據(jù)已經(jīng)拷貝到用戶空間以后,才發(fā)信號(hào)通知進(jìn)程——AIO(Java)
    線程發(fā)起io請(qǐng)求后,立即返回(非阻塞io),當(dāng)數(shù)據(jù)讀寫完成后,OS通知用戶線程(異步)。這里數(shù)據(jù)寫入socket空間,或從socket空間讀取數(shù)據(jù)到用戶空間由OS完成,用戶線程無需介入,所以也就不會(huì)阻塞用戶線程,即異步
image.png
image.png

Linux IO流程

  • 內(nèi)核空間等待數(shù)據(jù)準(zhǔn)備好(Waiting for the data to be ready)
  • 從內(nèi)核向進(jìn)程復(fù)制數(shù)據(jù)(copying the data from the kernel to the process)


    image.png

各種I/O模型的比較

image.png

2、 Java BIO(Blocking I/O)

  • ServerSocket
  • Socket
image.png

Demo

package com.demo.io;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class MultiThreadedEchoServer {
    private int port;

    public MultiThreadedEchoServer(int port) {
        this.port = port;
    }

    public void startServer() {
        ServerSocket echoServer = null;
        Executor executor = Executors.newFixedThreadPool(5);
        int i = 0;
        System.out.println("服務(wù)器在端口[" + this.port + "]等待客戶請(qǐng)求......");
        try {
            echoServer = new ServerSocket(8080);
            while (true) {
                Socket clientRequest = echoServer.accept();
                executor.execute(new ThreadedServerHandler(clientRequest, i++));
            }
        } catch (IOException e) {
            System.out.println(e);
        }
    }

    public static void main(String[] args) throws IOException {
        new MultiThreadedEchoServer(8080).startServer();

    }
}
package com.demo.io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;

public class ThreadedServerHandler implements Runnable {
    Socket clientSocket = null;
    int clientNo = 0;

    ThreadedServerHandler(Socket socket, int i) {
        if (socket != null) {
            clientSocket = socket;
            clientNo = i;
            System.out.println("創(chuàng)建線程為[" + clientNo + "]號(hào)客戶服務(wù)...");
        }
    }

    @Override
    public void run() {
        PrintStream os = null;
        BufferedReader in = null;
        try {
            in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
            os = new PrintStream(clientSocket.getOutputStream());
            String inputLine;
            while ((inputLine = in.readLine()) != null) {

                // 輸入'Quit'退出
                if (inputLine.equals("Quit")) {
                    System.out.println("關(guān)閉與客戶端[" + clientNo + "]......" + clientNo);
                    os.close();
                    in.close();
                    clientSocket.close();
                    break;
                } else {
                    System.out.println("來自客戶端[" + clientNo + "]的輸入: [" + inputLine + "]!");
                    os.println("來自服務(wù)器端的響應(yīng):" + inputLine);
                }
            }
        } catch (IOException e) {
            System.out.println("Stream closed");
        }
    }
}
package com.demo.io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;

public class EchoClient {
    public static void main(String[] args) {

        Socket echoSocket = null;
        PrintWriter out = null;
        BufferedReader in = null;

        try {

            echoSocket = new Socket("127.0.0.1", 8080);
            out = new PrintWriter(echoSocket.getOutputStream(), true);
            in = new BufferedReader(new InputStreamReader(
                    echoSocket.getInputStream()));
            System.out.println("連接到服務(wù)器......");
            System.out.println("請(qǐng)輸入消息[輸入\"Quit\"]退出:");
            BufferedReader stdIn = new BufferedReader(new InputStreamReader(
                    System.in));
            String userInput;

            while ((userInput = stdIn.readLine()) != null) {
                out.println(userInput);
                System.out.println(in.readLine());

                if (userInput.equals("Quit")) {
                    System.out.println("關(guān)閉客戶端......");
                    out.close();
                    in.close();
                    stdIn.close();
                    echoSocket.close();
                    System.exit(1);
                }
                System.out.println("請(qǐng)輸入消息[輸入\"Quit\"]退出:");
            }
        } catch (UnknownHostException e) {
            System.err.println("Don't know about host");
            System.exit(1);
        } catch (IOException e) {
            System.err.println("Couldn't get I/O for "
                    + "the connection ");
            System.exit(1);
        }
    }
}

3、Java NIO簡介

1、變遷

2、 Java IO vs NIO

image.png

4、 Java NIO組件之Buffer

Java NIO Buffer:一個(gè)Buffer本質(zhì)上是內(nèi)存中的一塊,可以將數(shù)據(jù)寫入這塊內(nèi)存,從這塊內(nèi)存中獲取數(shù)據(jù)
java.nio定義了以下幾種Buffer的實(shí)現(xiàn)

image.png

1、Buffer中有三個(gè)主要概念:

image.png
  • capacity:

    • 代表這個(gè)緩沖區(qū)的容量,一旦設(shè)定就不可以更改;比如capacity為1024的IntBuffer,代表其一次可以存放1024個(gè)int類型的值;
    • 一旦buffer的容量達(dá)到capacity,需要清空buffer,才能重新寫入值
  • position:位置索引,表示當(dāng)前正在操作(讀寫)時(shí)的位置

    • 從寫操作模式到讀操作模式切換的時(shí)候(flip),position 都會(huì)歸零,這樣就可以從頭開始讀寫了;
  • limit:讀寫模式下,可操作(讀寫)的限制;

    • 寫操作模式下,limit 代表的是最大能寫入的數(shù)據(jù),這個(gè)時(shí)候 limit 等于 capacity;
    • 讀操作模式下,limit 等于 Buffer 中實(shí)際的數(shù)據(jù)大小,因?yàn)?Buffer 不一定被寫滿了。
image.png

2、Direct ByteBuffer VS. non-direct ByteBuffer

  • Non-direct ByteBuffer
    -> HeapByteBuffer,標(biāo)準(zhǔn)的java類
    -> 維護(hù)一份byte[]在JVM堆上
    -> 創(chuàng)建開銷小
  • Direct ByteBuffer
    -> DirectByteBuffer,底層存儲(chǔ)在非JVM堆上,通過native代碼操作
    -> -XX:MaxDirectMemorySize=<size>
    -> 創(chuàng)建開銷大
image.png
image.png
image.png

3、Buffer API:

  • buffer創(chuàng)建
    • allocate/allocateDirect
    //創(chuàng)建一個(gè)容量為10的byte緩沖區(qū)
    ByteBuffer buff = ByteBuffer.allocate(10);
    //創(chuàng)建一個(gè)容量為10的char緩沖區(qū)
    CharBuffer buff = CharBuffer.allocate(10);
    
    • wrap
    //使用一個(gè)指定數(shù)組作為緩沖區(qū)的存儲(chǔ)器,
    //緩沖區(qū)的數(shù)據(jù)會(huì)存放在bytes數(shù)組中,
    //bytes數(shù)組或buff緩沖區(qū)任何一方中數(shù)據(jù)的改動(dòng)都會(huì)影響另一方
    byte[] bytes = new byte[10];
    ByteBuffer buff = ByteBuffer.wrap(bytes);
    //創(chuàng)建指定初始位置(position)和上界(limit)的緩沖區(qū):
    byte[] bytes = new byte[10];
    ByteBuffer buff = ByteBuffer.wrap(bytes, 3, 8);
    
  • buffer讀取
    • put/get:使用get()從緩沖區(qū)中取數(shù)據(jù),使用put()向緩沖區(qū)中存數(shù)據(jù)
    • flip:將一個(gè)處于存數(shù)據(jù)(put)狀態(tài)的緩沖區(qū)變?yōu)橐粋€(gè)處于準(zhǔn)備取數(shù)(get)的狀態(tài)
    • mark/reset:使用mark記住當(dāng)前位置(標(biāo)記),之后可以使用reset將位置恢復(fù)到標(biāo)記處
    • compact:壓縮,將已讀取了的數(shù)據(jù)丟棄,保留未讀取的數(shù)據(jù)并將保留的數(shù)據(jù)重新填充到緩沖區(qū)的頂部,然后繼續(xù)向緩沖區(qū)寫入數(shù)據(jù);
    • rewind/clear : 設(shè)置讀寫position為0;
public final Buffer rewind() {
    position = 0;
    mark = -1;  //取消標(biāo)記
    return this;
    }
public final Buffer clear(){
      position = 0; //重置當(dāng)前讀寫位置
      limit = capacity; 
      mark = -1;  //取消標(biāo)記
      return this;
}
package com.demo.nio.buffers;

import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;

public class BufferAccess {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        printBuffer(buffer);
        
        buffer.put((byte)'H').put((byte)'e').put((byte)'l').put((byte)'l').put((byte)'o');
        printBuffer(buffer);
        // 翻轉(zhuǎn)緩沖區(qū)
        buffer.flip();
        printBuffer(buffer);

        //取buffer
        System.out.println("" + (char) buffer.get() + (char) buffer.get());
        printBuffer(buffer);

        buffer.mark();
        printBuffer(buffer);

        //讀取兩個(gè)元素后,恢復(fù)到之前mark的位置處
        System.out.println("" + (char) buffer.get() + (char) buffer.get());
        printBuffer(buffer);

        buffer.reset();
        //buffer.rewind();

        printBuffer(buffer);

        //壓縮,將已讀取了的數(shù)據(jù)丟棄,
        // 保留未讀取的數(shù)據(jù)并將保留的數(shù)據(jù)重新填充到緩沖區(qū)的頂部,然后繼續(xù)向緩沖區(qū)寫入數(shù)據(jù)
        buffer.compact();
        printBuffer(buffer);

        buffer.clear();
        printBuffer(buffer);

    }
    
    private static void printBuffer(Buffer buffer) {
        System.out.println("[limit=" + buffer.limit() 
                +", position = " + buffer.position()
                +", capacity = " + buffer.capacity()
                +", array = " + new String((byte[]) buffer.array()) +"]");
    }
}
  • buffer復(fù)制(淺復(fù)制)
    • duplicate():復(fù)制一個(gè)可讀可寫的緩沖區(qū)
    • asReadOnlyBuffer():復(fù)制一個(gè)只讀緩沖區(qū)
    • slice():復(fù)制一個(gè)從源緩沖position到limit的新緩沖區(qū)

5、 Java NIO組件之 Channel

所有的 NIO 操作始于通道,通道是數(shù)據(jù)來源或數(shù)據(jù)寫入的目的地
java.nio 包中主要實(shí)現(xiàn)的以下幾個(gè) Channel:
FileChannel:文件通道,用于文件的讀和寫;
DatagramChannel:用于 UDP 連接的接收和發(fā)送;
SocketChannel:TCP 連接通道,簡單理解就是 TCP 客戶端;
ServerSocketChannel:TCP 對(duì)應(yīng)的服務(wù)端,用于監(jiān)聽某個(gè)端口進(jìn)來的請(qǐng)求;

image.png
image.png
image.png

6、 Java NIO組件之 Selector

java.nio.channels.Selector,支持IO多路復(fù)用的抽象實(shí)體;
用于檢查一個(gè)或多個(gè)NIO Channel的狀態(tài)是否處于可讀、可寫;
這樣的話,可以實(shí)現(xiàn)單線程管理多個(gè)channels,也就是可以管理多個(gè)網(wǎng)絡(luò)鏈接,如下圖;

image.png
  • 創(chuàng)建Selector
Selector selector = Selector.open();
  • 注冊Channel到Selector上
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

//register的第二個(gè)參數(shù),這個(gè)參數(shù)是一個(gè)“關(guān)注集合”,代表關(guān)注的channel狀態(tài),
//有四種基礎(chǔ)類型可供監(jiān)聽, 用SelectionKey中的常量表示如下:
//SelectionKey.OP_CONNECT
//SelectionKey.OP_ACCEPT
//SelectionKey.OP_READ
//SelectionKey.OP_WRITE
  • 從Selector中選擇channel
    Selector.select 更新所有就緒的SelectionKey的狀態(tài),并返回就緒的channel個(gè)數(shù);
    一旦向Selector注冊了一個(gè)或多個(gè)channel后,就可以調(diào)用select來獲取channel,select方法會(huì)返回所有處于就緒狀態(tài)的channel,
    select()方法的返回值是一個(gè)int,代表有多少channel處于就緒了。也就是自上一次select后有多少channel進(jìn)入就緒
int select()
int select(long timeout)
int selectNow()

迭代Selected Key集合并處理就緒channel
在調(diào)用select并返回了有channel就緒之后,可以通過選中的key集合來獲取channel,這個(gè)操作通過調(diào)用selectedKeys()方法:

Set<SelectionKey> selectedKeys = selector.selectedKeys();  

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing
    }

    keyIterator.remove();
image.png

7、Demo

package com.demo.nio.demo;

import java.io.IOException;

 
public class NIOEchoServer {

 
    public static void main(String[] args) throws IOException {
    int port = 8080;
    if (args != null && args.length > 0) {
        try {
        port = Integer.valueOf(args[0]);
        } catch (NumberFormatException e) {
        // 采用默認(rèn)值
        }
    }
    EchoHandler timeServer = new EchoHandler(port);
    new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}
package com.demo.nio.demo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class EchoHandler implements Runnable {

    private Selector selector;
    private ServerSocketChannel servChannel;
    private volatile boolean stop;
    private int num = 0;

    public EchoHandler(int port) {
        try {
            //創(chuàng)建Selector
            selector = Selector.open();
            //創(chuàng)建ServerSocketChannel并注冊到Selector上,關(guān)注Accept事件
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服務(wù)器在端口[" + port + "]等待客戶請(qǐng)求......");
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                //更新所有就緒的SelectionKey的狀態(tài),并返回就緒的channel個(gè)數(shù)
                if(selector.select(1000)==0)
                                    continue;
                 //迭代Selected Key集合并處理就緒channel
                //事件處理循環(huán):遍歷就緒的channel,分別進(jìn)行處理
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }

        // 多路復(fù)用器關(guān)閉后,所有注冊在上面的Channel和Pipe等資源都會(huì)被自動(dòng)去注冊并關(guān)閉,所以不需要重復(fù)釋放資源
        if (selector != null)
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
    }

    private void handleInput(SelectionKey key) throws IOException {

        if (key.isValid()) {
            // 處理新接入的請(qǐng)求消息,Accept事件
            if (key.isAcceptable()) {
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = ssc.accept(); // Non blocking, never null
                socketChannel.configureBlocking(false);
                SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_READ);

                sk.attach(num++);
            }
             //處理read事件
            if (key.isReadable()) {
                // 讀取數(shù)據(jù)
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("來自客戶端[" + key.attachment() + "]的輸入: [" + body.trim() + "]!");

                    if (body.trim().equals("Quit")) {
                        System.out.println("關(guān)閉與客戶端[" + key.attachment() + "]......");
                        key.cancel();
                        sc.close();
                    } else {
                        String response = "來自服務(wù)器端的響應(yīng):" + body;
                        doWrite(sc, response);
                    }

                } else if (readBytes < 0) {
                    // 對(duì)端鏈路關(guān)閉
                    key.cancel();
                    sc.close();
                } else {

                }
            }
        }
    }

    private void doWrite(SocketChannel channel, String response) throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
}
package com.demo.nio.demo;

public class NIOEchoClient {

    public static void main(String[] args) {

        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
            }
        }
        new Thread(new NIOEchoClientHandler("127.0.0.1", port), "NIOEchoClient-001").start();
    }
}
package com.demo.nio.demo;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NIOEchoClientHandler implements Runnable {

    private String host;
    private int port;

    private Selector selector;
    private SocketChannel socketChannel;
    
    private ExecutorService executorService;

    private volatile boolean stop;

    public NIOEchoClientHandler(String host, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        this.executorService= Executors.newSingleThreadExecutor();

        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        try {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            socketChannel.connect(new InetSocketAddress(host, port));
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null)
                                key.channel().close();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }

        // 多路復(fù)用器關(guān)閉后,所有注冊在上面的Channel和Pipe等資源都會(huì)被自動(dòng)去注冊并關(guān)閉,所以不需要重復(fù)釋放資源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        if(executorService != null) {
            executorService.shutdown();
        }
    }

    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            // 判斷是否連接成功
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    System.out.println("連接到服務(wù)器......");
                    
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    System.out.println("請(qǐng)輸入消息[輸入\"Quit\"]退出:");

                    executorService.submit(() -> {
                        while(true) {
                            try {
                                buffer.clear();
                                InputStreamReader input = new InputStreamReader(System.in);
                                BufferedReader br = new BufferedReader(input);
                                
                                String msg = br.readLine();
                                
                                if (msg.equals("Quit")) {
                                    System.out.println("關(guān)閉客戶端......");
                                    key.cancel();
                                    sc.close();
                                    this.stop = true;
                                    break;
                                }
                                
                                buffer.put(msg.getBytes());
                                buffer.flip();
                                
                                sc.write(buffer);
                                
                                System.out.println("請(qǐng)輸入消息[輸入\"Quit\"]退出:");

                            } catch (Exception ex) {
                                ex.printStackTrace();
                            }
                        }                       
                    });
                    sc.register(selector, SelectionKey.OP_READ);
                } else {
                    System.exit(1); // 連接失敗,進(jìn)程退出
                }
            }
            
            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println(body);
                    
                    if(body.equals("Quit"))
                    {
                        this.stop = true;
                    }
                } else if (readBytes < 0) {
                    // 對(duì)端鏈路關(guān)閉
                    key.cancel();
                    sc.close();
                } else
                    ; // 讀到0字節(jié),忽略
            }
            
            if(key.isWritable()){
                 System.out.println("The key is writable");
            }
        }
    }
 

    private void doWrite(SocketChannel sc) throws IOException {
/*      System.out.println("請(qǐng)輸入消息[輸入\"Quit\"]退出:");
        BufferedReader stdIn = new BufferedReader(new InputStreamReader(
                System.in));
        String userInput;

        while ((userInput = stdIn.readLine()) != null) {
            out.println(userInput);
            System.out.println(in.readLine());*/
            byte[] req = "QUERY TIME ORDER".getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
            writeBuffer.put(req);
            writeBuffer.flip();
            sc.write(writeBuffer);
            if (!writeBuffer.hasRemaining())
                System.out.println("Send order 2 server succeed.");

/*          
            if (userInput.equals("Quit")) {
                System.out.println("Closing client");
                out.close();
                in.close();
                stdIn.close();
                echoSocket.close();
                System.exit(1);
            }
            System.out.println("請(qǐng)輸入消息[輸入\"Quit\"]退出:");

        }*/
        
    }
}

8、NIO錯(cuò)誤認(rèn)識(shí)

1、使用NIO = 高性能
其實(shí)并不一定,在一些場景下,使用NIO并不一定更快,比如

  • 客戶端應(yīng)用
  • 連接數(shù)<1000
  • 并發(fā)程度不高
  • 局域網(wǎng)環(huán)境下

2、NIO完全屏蔽了平臺(tái)差異
NO,NIO仍然是基于各個(gè)OS平臺(tái)的IO系統(tǒng)實(shí)現(xiàn)的,差異仍然存在

3、使用NIO做網(wǎng)絡(luò)編程很容易

  • 離散的事件驅(qū)動(dòng)模型,編程困難
  • 陷阱重重
    (其實(shí),真正簡化網(wǎng)絡(luò)編程的是Netty)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、Socket通道 新的socket通道類可以運(yùn)行非阻塞模式并且是可選擇的。這兩個(gè)性能可以激活大程序(如網(wǎng)絡(luò)服務(wù)...
    Java架構(gòu)師筆記閱讀 2,645評(píng)論 0 3
  • Java NIO提供了與標(biāo)準(zhǔn)IO不同的IO工作方式: Channels and Buffers(通道和緩沖區(qū)):標(biāo)...
    Java面試指南閱讀 2,585評(píng)論 0 2
  • 原文 先來回顧一下傳統(tǒng)的IO模式的,將傳統(tǒng)的IO模式的相關(guān)類理清楚(因?yàn)镮O的類很多)。 但是,發(fā)現(xiàn)在整理的過程已...
    baby_buibui閱讀 1,527評(píng)論 2 35
  • Java NIO(New IO)是從Java 1.4版本開始引入的一個(gè)新的IO API,可以替代標(biāo)準(zhǔn)的Java I...
    JackChen1024閱讀 7,969評(píng)論 1 143
  • 晚上,媽媽正幫我默單詞,這時(shí),我抬頭看到窗外美麗的煙火,我叫媽媽把燈關(guān)掉,媽媽答應(yīng)了。于是,我們一起欣賞起來。煙...
    劉金寶_7db3閱讀 286評(píng)論 0 0

友情鏈接更多精彩內(nèi)容