Netty學(xué)習(xí)前基本知識 — BIO 、NIO 、AIO 總結(jié)

點(diǎn)贊再看,養(yǎng)成習(xí)慣,公眾號搜一搜【一角錢技術(shù)】關(guān)注更多原創(chuàng)技術(shù)文章。本文 GitHub org_hejianhui/JavaStudy 已收錄,有我的系列文章。

前言

熟練掌握 BIO、NIO、AIO 的基本概念以及一些常見問題是你準(zhǔn)備面試的過程中不可或缺的一部分,另外這些知識點(diǎn)也是你學(xué)習(xí) Netty 的基礎(chǔ)。

基本概念

IO模型就是說用什么樣的通道進(jìn)行數(shù)據(jù)的發(fā)送和接收,Java 共支持3中網(wǎng)絡(luò)變成 IO 模式:BIO、NIO、AIO。Java 中的 BIO、NIO 和 AIO 理解為是 Java 語言對操作系統(tǒng)的各種 IO 模型的封裝。我們在使用這些 API 的時候,不需要關(guān)系操作系統(tǒng)層面的知識,也不需要根據(jù)不同操作系統(tǒng)編寫不同的代碼。

在講 BIO、NIO、AIO 之前先回顧幾個概念:同步與異步、阻塞與非阻塞、I/O模型。

同步與異步

  • 同步:同步就是發(fā)起一個調(diào)用后,被調(diào)用者未處理完請求之前,調(diào)用不返回。
  • 異步:異步就是發(fā)一個調(diào)用后,立刻得到被調(diào)用者的回應(yīng)表示已接收到請求,但是被調(diào)用者并沒有返回結(jié)果,此時可以處理其他的請求,被調(diào)用者通常依靠事件、回調(diào)等機(jī)制來通知調(diào)用者其返回結(jié)果。

同步和異步的區(qū)別最大在于異步的話調(diào)用者不需要等待結(jié)果處理,被調(diào)用者會通過回調(diào)等機(jī)制來通知調(diào)用者返回結(jié)果。

阻塞和非阻塞

  • 阻塞:阻塞就是發(fā)起一個請求,調(diào)用者一直等待請求結(jié)果返回,也就是當(dāng)前線程會被掛起,無法從事其他任務(wù),只有當(dāng)條件就緒才能繼續(xù)。
  • 非阻塞:非阻塞就是發(fā)起一個請求,調(diào)用者不用一直等著結(jié)果返回,可以先去干其他的事情。

同步異步與阻塞非阻塞(段子)

老張燒開水的故事(故事來源網(wǎng)絡(luò))

老張愛喝茶,廢話不說,煮開水。

出場人物:老張,水壺兩把(普通水壺,簡稱水壺;會響的水壺,簡稱響水壺)。

  1. 老張把水壺放到火上,立等水開。(同步阻塞

老張覺得自己有點(diǎn)傻

  1. 老張把水壺放到火上,去客廳看電視,時不時去廚房看看水開沒有。(同步非阻塞

老張還是覺得自己有點(diǎn)傻,于是變高端了,買了把會響笛的那種水壺。水開之后,能大聲發(fā)出嘀~~的噪音。

  1. 老張把響水壺放到火上,立等水開。(異步阻塞

老張覺得這樣傻等意義不大

  1. 老張把響水壺放到火上,去客廳看電視,水壺響之前不再去看它了,響了再去拿壺。(異步非阻塞

老張覺得自己聰明了。

所謂同步異步,只是對于水壺而言

  • 普通水壺:同步;響水壺:異步。
  • 雖然都能干活,但響水壺可以在自己完工之后,提示老張水開了,這是普通水壺所不能及的。
  • 同步只能讓調(diào)用者去輪詢自己(情況2中),造成老張效率的低下。

所謂阻塞非阻塞,僅僅對于老張而言

  • 立等的老張:阻塞;看電視的老張:非阻塞。
  • 情況1 和 情況3 中老張就是阻塞的,媳婦喊他都不知道。雖然情況3中響水壺是異步的,可對于立等的老張沒有太大的意義。所以一般異步是配合非阻塞使用的,這樣才能發(fā)揮異步的效用。

常見的 I/O 模型對比

所有的系統(tǒng) I/O 都分為兩個階段:等待就緒 和 操作。

舉例來說,讀函數(shù),分為等待系統(tǒng)可讀和真正的讀;同理,寫函數(shù)分為等待網(wǎng)卡可以寫和真正的寫。

需要說明的是等待就緒的阻塞是不使用 CPU 的,是在“空等”;而真正的讀操作的阻塞是使用 CPU 的,真正在“干活”,而且這個過程非???,屬于 memory copy,帶寬通常在 1GB/s 級別以上,可以理解為基本不耗時。

如下幾種常見 I/O 模型的對比:


以socket.read()為例子

  • 傳統(tǒng)的BIO里面socket.read(),如果TCP RecvBuffer里沒有數(shù)據(jù),函數(shù)會一直阻塞,直到收到數(shù)據(jù),返回讀到的數(shù)據(jù)。
  • 對于NIO,如果TCP RecvBuffer有數(shù)據(jù),就把數(shù)據(jù)從網(wǎng)卡讀到內(nèi)存,并且返回給用戶;反之則直接返回0,永遠(yuǎn)不會阻塞。
  • AIO(Async I/O)里面會更進(jìn)一步:不但等待就緒是非阻塞的,就連數(shù)據(jù)從網(wǎng)卡到內(nèi)存的過程也是異步的。

換句話說,BIO里用戶最關(guān)心“我要讀”,NIO里用戶最關(guān)心"我可以讀了",在AIO模型里用戶更需要關(guān)注的是“讀完了”。

NIO一個重要的特點(diǎn)是:socket主要的讀、寫、注冊和接收函數(shù),在等待就緒階段都是非阻塞的,真正的I/O操作是同步阻塞的(消耗CPU但性能非常高)。

BIO(Blocking I/O)

同步阻塞 I/O 模式,數(shù)據(jù)的讀取寫入必須阻塞在一個線程內(nèi)等待其完成(一個客戶端連接對于一個處理線程)。

傳統(tǒng) BIO

BIO通信(一請求一應(yīng)答)模型圖如下(圖源網(wǎng)絡(luò)):


采用 BIO 通信模型 的服務(wù)隊(duì),通常由一個獨(dú)立的 Acceptor 線程負(fù)責(zé)監(jiān)聽客戶端的連接。我們一般通過在 while(true) 循環(huán)中服務(wù)端會調(diào)用 accept() 方法等待客戶端連接的方式監(jiān)聽請求,請求一旦接收到一個連接請求,就可以建立通信套接字在這個通信套接字上進(jìn)行讀寫操作,此時不能再接收其他客戶端連接請求,只能等待當(dāng)前連接的客戶端的操作執(zhí)行完成,不過可以通過多線程來支持多個客戶端的連接,如上圖所示。

如果要讓 BIO 通信模型 能夠同時處理多個客戶端的請求,就必須使用多線程(要原因是 socket.accept()、 socket.read()socket.write() 涉及的三個主要函數(shù)都是同步阻塞的),當(dāng)一個連接在處理 I/O 的時候,系統(tǒng)是阻塞的,如果是單線程的必然就掛死在哪里。開啟多線程,就可以讓CPU去處理更多的事情。也就是說它在接收到客戶端連接請求之后為每個客戶端創(chuàng)建一個新的線程進(jìn)行鏈路處理,處理完成之后,通過輸出流返回給客戶端,線程銷毀。這就是典型的 一請求一應(yīng)答通信模型。

其實(shí)這也是所有使用多線程的本質(zhì):

  • 利用多核
  • 當(dāng) I/O 阻塞系統(tǒng),但 CPU 空閑的時候,可以利用多線程使用 CPU 資源。

我們可以設(shè)想以下如果連接不做任何的事情的話就會造成不必要的線程開銷,不過可以通過 線程池機(jī)制 改善,線程池還可以讓線程的創(chuàng)建和回收成本相對較低。例如使用FixedTreadPool 可以有效的控制來線程的最大數(shù)量,保證來系統(tǒng)有限的資源的控制,實(shí)現(xiàn)了N(客戶端請求數(shù)量):M(處理客戶端請求的線程數(shù)量)的偽異步I/O模型(N可以遠(yuǎn)遠(yuǎn)大于M)。

我們再設(shè)想以下當(dāng)客戶端并發(fā)訪問量增加后這種模型會出什么問題?
隨著并發(fā)訪問量增加會導(dǎo)致線程數(shù)急劇膨脹可能會導(dǎo)致線程堆棧溢出、創(chuàng)建新線程失敗等問題,最終導(dǎo)致進(jìn)程宕機(jī)或者僵死,不能對外提供服務(wù)。

在Java虛擬機(jī)中,線程是寶貴的資源,主要體現(xiàn)在:

  1. 線程的創(chuàng)建和銷毀成本很高,尤其在 Linux 操作系統(tǒng)中,線程本質(zhì)上就是一個進(jìn)程,創(chuàng)建和銷毀線程都是重量級的系統(tǒng)函數(shù);
  2. 線程本身占用較大內(nèi)存,像 Java 的線程棧,一般至少分配 512k~1M 的空間,如果系統(tǒng)中的線程數(shù)過千,恐怕整個 JVM 的內(nèi)存都會被吃掉一半;
  3. 線程的切換成本也很高。操作系統(tǒng)發(fā)生線程切換的時候,需要保留線程的上下文,然后執(zhí)行系統(tǒng)調(diào)用。如果線程數(shù)過高,可能執(zhí)行線程切換的時間甚至?xí)笥诰€程執(zhí)行的時間,這時候帶來的表現(xiàn)往往是系統(tǒng)load偏高,CPU sy 使用率特別高(超過20%以上),導(dǎo)致系統(tǒng)幾乎陷入不可用的狀態(tài);
  4. 容易造成鋸齒狀的系統(tǒng)負(fù)載。因?yàn)橄到y(tǒng)的負(fù)載是用活動線程數(shù)和CPU核心數(shù),一旦線程數(shù)量高但外部網(wǎng)絡(luò)環(huán)境不是很穩(wěn)定,就很容易造成大量請求的結(jié)果同時返回,激活大量阻塞線程從而使系統(tǒng)負(fù)載壓力過大。
    Linux系統(tǒng)中CPU中sy過高> sy的值表示是內(nèi)核的消耗,如果出現(xiàn)sy的值過高,不要先去考慮是內(nèi)核的問題,先查看是不是內(nèi)存不足,是不是磁盤滿,是不是IO的問題,就是說先考慮自己進(jìn)程的問題,比方是否IO引起的問題,是否網(wǎng)絡(luò)引起的問題的。排查系統(tǒng)IO或者網(wǎng)絡(luò)等是否已經(jīng)到瓶頸了。

偽異步 I/O

為了解決同步阻塞I/O面臨的一個鏈路需要一個線程處理的問題,后來有人對它的線程模型進(jìn)行了優(yōu)化:后端通過一個線程池來處理多個客戶端的請求接入,形成客戶端個數(shù)M:線程池最大線程數(shù)N的比例關(guān)系,其中M可以遠(yuǎn)遠(yuǎn)大于N.通過線程池可以靈活地調(diào)配線程資源,設(shè)置線程的最大值,防止由于海量并發(fā)接入導(dǎo)致線程耗盡。

偽異步IO模型圖(圖源網(wǎng)絡(luò))



采用線程池和任務(wù)隊(duì)列可以實(shí)現(xiàn)一種叫做偽異步的 I/O 通信框架,它的模型圖如上圖所示。當(dāng)有新的客戶端接入時,將客戶端的 Socket 封裝成一個Task(該任務(wù)實(shí)現(xiàn)java.lang.Runnable接口)投遞到后端的線程池中進(jìn)行處理,JDK 的線程池維護(hù)一個消息隊(duì)列和 N 個活躍線程,對消息隊(duì)列中的任務(wù)進(jìn)行處理。由于線程池可以設(shè)置消息隊(duì)列的大小和最大線程數(shù),因此,它的資源占用是可控的,無論多少個客戶端并發(fā)訪問,都不會導(dǎo)致資源的耗盡和宕機(jī)。

偽異步I/O通信框架采用了線程池實(shí)現(xiàn),因此避免了為每個請求都創(chuàng)建一個獨(dú)立線程造成的線程資源耗盡問題。不過因?yàn)樗牡讓尤稳皇峭阶枞腂IO模型,因此無法從根本上解決問題。

缺點(diǎn)

  1. IO 代碼里 read 操作是阻塞操作,如果連接不做數(shù)據(jù)讀寫操作會導(dǎo)致線程阻塞,浪費(fèi)資源;
  2. 如果線程很多,會導(dǎo)致服務(wù)器線程太多,壓力太大。

應(yīng)用場景

BIO 方式適用于連接數(shù)目比較小且固定的架構(gòu),這種方式對服務(wù)器資源要求比較高,但程序簡單理解。


bio.png

BIO 代碼示例

服務(wù)端

package com.niuh.bio;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

public class SocketServer {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(9000);
        while (true) {
            System.out.println("等待連接。。");
            //阻塞方法
            final Socket socket = serverSocket.accept();
            System.out.println("有客戶端連接了。。");

            // 多線程處理
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        handler(socket);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();

            // 單線程處理
            //handler(socket);

        }
    }

    private static void handler(Socket socket) throws IOException {
        System.out.println("thread id = " + Thread.currentThread().getId());
        byte[] bytes = new byte[1024];

        System.out.println("準(zhǔn)備read。。");
        //接收客戶端的數(shù)據(jù),阻塞方法,沒有數(shù)據(jù)可讀時就阻塞
        int read = socket.getInputStream().read(bytes);
        System.out.println("read完畢。。");
        if (read != -1) {
            System.out.println("接收到客戶端的數(shù)據(jù):" + new String(bytes, 0, read));
            System.out.println("thread id = " + Thread.currentThread().getId());

        }
        socket.getOutputStream().write("HelloClient".getBytes());
        socket.getOutputStream().flush();
    }
}

客戶端

package com.niuh.bio;

import java.io.IOException;
import java.net.Socket;

public class SocketClient {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("127.0.0.1", 9000);
        //向服務(wù)端發(fā)送數(shù)據(jù)
        socket.getOutputStream().write("HelloServer".getBytes());
        socket.getOutputStream().flush();
        System.out.println("向服務(wù)端發(fā)送數(shù)據(jù)結(jié)束");
        byte[] bytes = new byte[1024];
        //接收服務(wù)端回傳的數(shù)據(jù)
        socket.getInputStream().read(bytes);
        System.out.println("接收到服務(wù)端的數(shù)據(jù):" + new String(bytes));
        socket.close();
    }
}

NIO(Non Blocking IO)

同步非阻塞,服務(wù)器實(shí)現(xiàn)模式為一個線程可以處理多個請求(連接),客戶端發(fā)送的連接請求都會注冊到 多路復(fù)用器 selector 上,多路復(fù)用器輪詢到連接有 IO 請求就進(jìn)行處理。

它支持面向緩沖的,基于通道的I/O操作方法。NIO提供了與傳統(tǒng)BIO模型中的 SocketServerSocket 相對應(yīng)的 SocketChannelServerSocketChannel 兩種不同的套接字通道實(shí)現(xiàn),兩種通道都支持阻塞和非阻塞兩種模式。

  • 阻塞模式使用就像傳統(tǒng)中的支持一樣,比較簡單,但是性能和可靠性都不好;
  • 非阻塞模式正好與之相反。

對于低負(fù)載、低并發(fā)的應(yīng)用程序,可以使用同步阻塞I/O來提升開發(fā)速率和更好的維護(hù)性;
對于高負(fù)載、高并發(fā)的(網(wǎng)絡(luò))應(yīng)用,應(yīng)使用 NIO 的非阻塞模式來開發(fā)。

NIO核心組件

NIO 有三大核心組件:

  • Channel通道
  • Buffer緩沖區(qū)
  • Selector選擇器

整個NIO體系包含的類遠(yuǎn)遠(yuǎn)不止這三個,只能說這三個是NIO體系的“核心API”。


nio2.png
  1. channel 類似于流,每個 channel 對應(yīng)一個 buffer 緩沖區(qū),buffer 底層就是個數(shù)組;
  2. channel 會注冊到 selector 上,由 selector 根據(jù) channel 讀寫事件的發(fā)生將其交由某個空閑的線程處理;
  3. selector 可以對應(yīng)一個或多個線程
  4. NIO 的 Buffer 和 channel 既可以讀也可以寫

NIO的特性

我們從一個問題來總結(jié):NIO 與 IO 的區(qū)別?

如果是在面試中來回答這個問題,我覺得首先肯定要從 NIO 流是非阻塞 IO,而 IO 流是阻塞 IO說起。然后可以從 NIO 的3個核心組件/特性為 NIO 帶來的一些改進(jìn)來分析。

IO流是阻塞的,NIO流不是阻塞的

Java NIO 使我們可以進(jìn)行非阻塞 IO 操作。比如說,單線程中從通道讀取數(shù)據(jù)到 buffer,同時可以繼續(xù)做別的事情,當(dāng)數(shù)據(jù)讀取到 buffer 中后,線程再繼續(xù)處理數(shù)據(jù)。寫數(shù)據(jù)也是一樣的。另外,非阻塞寫也是日常,一個線程請求寫入一些數(shù)據(jù)到某通道,但不需要等待它完全寫入,這個線程同時可以去做別的事情。

Java IO 的各種流是阻塞的,這意味這,當(dāng)一個線程調(diào)用 read() 或 write() 時,該線程被阻塞,直到有一些數(shù)據(jù)被讀取或數(shù)據(jù)完全寫入。該線程在此期間不能再干任何事情了。

IO 面向流(Stream oriented),NIO 面向緩沖區(qū)(Buffer oriented)

Buffer(緩沖區(qū))

Buffer 是一個對象,它包含一些要寫入或者要讀出的數(shù)據(jù)。在 NIO 類庫中加入 Buffer對象,體現(xiàn)了新庫與原庫 I/O的一個重要區(qū)別:

  • 在面向流的 I/O 中,可以直接將數(shù)據(jù)寫入或者將數(shù)據(jù)直接讀到 Stream 對象中。雖然 Stream 中也有 Buffer 開通的擴(kuò)展類,但只是流的包裝類,還從流讀到緩沖區(qū)。
  • NIO 是直接讀到 Buffer 中進(jìn)行操作。在 NIO 庫中,所有的數(shù)據(jù)都是用緩沖區(qū)處理的。在讀取數(shù)據(jù)時,它是直接讀到緩沖區(qū)中的;在寫入數(shù)據(jù)時,寫入到緩存中。任何時候訪問 NIO 中的數(shù)據(jù),都是通過緩沖區(qū)進(jìn)行操作。

最常用的緩沖區(qū)是 ByteBuffer,ByteBuffer 提供流一組功能用于操作 byte 數(shù)組。除了 ByteBuffer 還有其他的一些緩沖區(qū),事實(shí)上,每一種 Java 基本類型(除了 Boolean 類型)都對應(yīng)有一種緩沖區(qū)。

NIO 通過 Channel(通道)進(jìn)行讀寫

Channel(通道)

通道是雙向的,可讀也可以寫,而流的讀寫是單向的。無論讀寫,通道只能和 Buffer 交互。因?yàn)?Buffer,通道可以異步地讀寫。

NIO 有選擇器,而 IO 沒有

Selectors(選擇器)

選擇器用于使用單線程處理多個通道。因此,它需要較少的線程來處理這些通道。線程之間的切換對于操作系統(tǒng)來說是昂貴的。因此,為了提供系統(tǒng)效率選擇器是有用的。

NIO 讀數(shù)據(jù)和寫數(shù)據(jù)

通常來說 NIO 中的所有 IO 都是從 Channel(通道)開始的。

  • 從通道進(jìn)行數(shù)據(jù)讀?。簞?chuàng)建一個緩沖區(qū),然后請求通道讀取數(shù)據(jù);
  • 從通道進(jìn)行數(shù)據(jù)寫入:創(chuàng)建一個緩沖區(qū),填充數(shù)據(jù),并要求通道寫入數(shù)據(jù)。

數(shù)據(jù)讀取和寫入操作如下:

應(yīng)用場景

NIO 方式適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),比如聊天服務(wù)器、彈幕系統(tǒng)、服務(wù)器間通訊、編程比較復(fù)雜。


nio.png

NIO 代碼示例

服務(wù)端

package com.niuh.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NIOServer {

    //public static ExecutorService pool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws IOException {
        // 創(chuàng)建一個在本地端口進(jìn)行監(jiān)聽的服務(wù)Socket通道.并設(shè)置為非阻塞方式
        ServerSocketChannel ssc = ServerSocketChannel.open();
        //必須配置為非阻塞才能往selector上注冊,否則會報(bào)錯,selector模式本身就是非阻塞模式
        ssc.configureBlocking(false);
        ssc.socket().bind(new InetSocketAddress(9000));
        // 創(chuàng)建一個選擇器selector
        Selector selector = Selector.open();
        // 把ServerSocketChannel注冊到selector上,并且selector對客戶端accept連接操作感興趣
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            System.out.println("等待事件發(fā)生。。");
            // 輪詢監(jiān)聽channel里的key,select是阻塞的,accept()也是阻塞的
            int select = selector.select();

            System.out.println("有事件發(fā)生了。。");
            // 有客戶端請求,被輪詢監(jiān)聽到
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey key = it.next();
                //刪除本次已處理的key,防止下次select重復(fù)處理
                it.remove();
                handle(key);
            }
        }
    }

    private static void handle(SelectionKey key) throws IOException {
        if (key.isAcceptable()) {
            System.out.println("有客戶端連接事件發(fā)生了。。");
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            //NIO非阻塞體現(xiàn):此處accept方法是阻塞的,但是這里因?yàn)槭前l(fā)生了連接事件,所以這個方法會馬上執(zhí)行完,不會阻塞
            //處理完連接請求不會繼續(xù)等待客戶端的數(shù)據(jù)發(fā)送
            SocketChannel sc = ssc.accept();
            sc.configureBlocking(false);
            //通過Selector監(jiān)聽Channel時對讀事件感興趣
            sc.register(key.selector(), SelectionKey.OP_READ);
        } else if (key.isReadable()) {
            System.out.println("有客戶端數(shù)據(jù)可讀事件發(fā)生了。。");
            SocketChannel sc = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            //NIO非阻塞體現(xiàn):首先read方法不會阻塞,其次這種事件響應(yīng)模型,當(dāng)調(diào)用到read方法時肯定是發(fā)生了客戶端發(fā)送數(shù)據(jù)的事件
            int len = sc.read(buffer);
            if (len != -1) {
                System.out.println("讀取到客戶端發(fā)送的數(shù)據(jù):" + new String(buffer.array(), 0, len));
            }
            ByteBuffer bufferToWrite = ByteBuffer.wrap("HelloClient".getBytes());
            sc.write(bufferToWrite);
            key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        } else if (key.isWritable()) {
            SocketChannel sc = (SocketChannel) key.channel();
            System.out.println("write事件");
            // NIO事件觸發(fā)是水平觸發(fā)
            // 使用Java的NIO編程的時候,在沒有數(shù)據(jù)可以往外寫的時候要取消寫事件,
            // 在有數(shù)據(jù)往外寫的時候再注冊寫事件
            key.interestOps(SelectionKey.OP_READ);
            //sc.close();
        }
    }
}

NIO服務(wù)端程序詳細(xì)分析

  1. 創(chuàng)建一個 ServerSocketChannel 和 Selector ,并將 ServerSocketChannel 注冊到 Selector 上;
  2. Selector 通過 select() 方法監(jiān)聽 channel 事件,當(dāng)客戶端連接時,selector 監(jiān)聽到連接事件,獲取到 ServerSocketChannel 注冊時綁定的 selectionKey;
  3. selectionKey 通過 channel() 方法可以獲取綁定的 ServerSocketChannel;
  4. ServerSocketChannel 通過 accept() 方法得到 SocketChannel;
  5. 將 SocketChannel 注冊到 Selector 上,關(guān)心 read 事件;
  6. 注冊后返回一個 SelectionKey,會和該 SocketChannel 關(guān)聯(lián);
  7. Selector 繼續(xù)通過 select() 方法監(jiān)聽事件,當(dāng)客戶端發(fā)送數(shù)據(jù)給服務(wù)端,Selector 監(jiān)聽到 read 事件,獲取到 SocketChannel 注冊時綁定的 SelectionKey;
  8. SelectionKey 通過 channel() 方法可以獲取綁定的 socketChannel;
  9. 將 socketChannel 里的數(shù)據(jù)讀取出來;
  10. 用 socketChannel 將服務(wù)端數(shù)據(jù)寫回客戶端。

客戶端

package com.niuh.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NioClient {
    //通道管理器
    private Selector selector;

    /**
     * 啟動客戶端測試
     *
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        NioClient client = new NioClient();
        client.initClient("127.0.0.1", 9000);
        client.connect();
    }

    /**
     * 獲得一個Socket通道,并對該通道做一些初始化的工作
     *
     * @param ip   連接的服務(wù)器的ip
     * @param port 連接的服務(wù)器的端口號
     * @throws IOException
     */
    public void initClient(String ip, int port) throws IOException {
        // 獲得一個Socket通道
        SocketChannel channel = SocketChannel.open();
        // 設(shè)置通道為非阻塞
        channel.configureBlocking(false);
        // 獲得一個通道管理器
        this.selector = Selector.open();

        // 客戶端連接服務(wù)器,其實(shí)方法執(zhí)行并沒有實(shí)現(xiàn)連接,需要在listen()方法中調(diào)
        //用channel.finishConnect() 才能完成連接
        channel.connect(new InetSocketAddress(ip, port));
        //將通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_CONNECT事件。
        channel.register(selector, SelectionKey.OP_CONNECT);
    }

    /**
     * 采用輪詢的方式監(jiān)聽selector上是否有需要處理的事件,如果有,則進(jìn)行處理
     *
     * @throws IOException
     */
    public void connect() throws IOException {
        // 輪詢訪問selector
        while (true) {
            selector.select();
            // 獲得selector中選中的項(xiàng)的迭代器
            Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey key = (SelectionKey) it.next();
                // 刪除已選的key,以防重復(fù)處理
                it.remove();
                // 連接事件發(fā)生
                if (key.isConnectable()) {
                    SocketChannel channel = (SocketChannel) key.channel();
                    // 如果正在連接,則完成連接
                    if (channel.isConnectionPending()) {
                        channel.finishConnect();
                    }
                    // 設(shè)置成非阻塞
                    channel.configureBlocking(false);
                    //在這里可以給服務(wù)端發(fā)送信息哦
                    ByteBuffer buffer = ByteBuffer.wrap("HelloServer".getBytes());
                    channel.write(buffer);
                    //在和服務(wù)端連接成功之后,為了可以接收到服務(wù)端的信息,需要給通道設(shè)置讀的權(quán)限。
                    channel.register(this.selector, SelectionKey.OP_READ);                                            // 獲得了可讀的事件
                } else if (key.isReadable()) {
                    read(key);
                }
            }
        }
    }

    /**
     * 處理讀取服務(wù)端發(fā)來的信息 的事件
     *
     * @param key
     * @throws IOException
     */
    public void read(SelectionKey key) throws IOException {
        //和服務(wù)端的read方法一樣
        // 服務(wù)器可讀取消息:得到事件發(fā)生的Socket通道
        SocketChannel channel = (SocketChannel) key.channel();
        // 創(chuàng)建讀取的緩沖區(qū)
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int len = channel.read(buffer);
        if (len != -1) {
            System.out.println("客戶端收到信息:" + new String(buffer.array(), 0, len));
        }
    }
}

總結(jié)

NIO 模型的 selector 就像一個大總管,負(fù)責(zé)監(jiān)聽各種 I/O 事件,然后轉(zhuǎn)交給后端線程去處理。

NIO 相對于 BIO 非阻塞的體現(xiàn)就在:BIO 的后端線程需要阻塞等待客戶端寫數(shù)據(jù)(比如 read 方法),如果客戶端不寫數(shù)據(jù)線程就要阻塞。

NIO 把等到客戶端操作的時候交給了大總管 selector ,selector 負(fù)責(zé)輪詢所有已注冊的客戶端,發(fā)現(xiàn)有事件發(fā)生了才轉(zhuǎn)交給后端線程處理,后端線程不需要做任何阻塞等待,直接處理客戶端事件的數(shù)據(jù)即可,處理完馬上結(jié)束,或返回線程池供其他客戶端事件繼續(xù)使用。還有就是 channel 的讀寫是非阻塞的。

Redis 就是典型的 NIO 線程模型,selector 收集所有的事件并且轉(zhuǎn)給后端線程,線程連續(xù)執(zhí)行所有事件命令并將結(jié)果寫回客戶端。

AIO(Asynchronous I/O)

異步非阻塞, 由操作系統(tǒng)完成后回調(diào)通知服務(wù)端程序啟動線程去處理, 一般適用于連接數(shù)較多且連接時間較長的應(yīng)用。

AIO 也就是 NIO 2。在 Java 7 中引入了 NIO 的改進(jìn)版 NIO 2,它是異步非阻塞的IO模型。異步 IO 是基于事件和回調(diào)機(jī)制實(shí)現(xiàn)的,也就是應(yīng)用操作之后會直接返回,不會堵塞在那里,當(dāng)后臺處理完成,操作系統(tǒng)會通知相應(yīng)的線程進(jìn)行后續(xù)的操作。

AIO 是異步IO的縮寫,雖然 NIO 在網(wǎng)絡(luò)操作中,提供了非阻塞的方法,但是 NIO 的 IO 行為還是同步的。對于 NIO 來說,我們的業(yè)務(wù)線程是在 IO 操作準(zhǔn)備好時,得到通知,接著就由這個線程自行進(jìn)行 IO 操作,IO操作本身是同步的。(除了 AIO 其他的 IO 類型都是同步的)

應(yīng)用場景

AIO 方式適用于連接數(shù)目多且連接比較長(重操作)的架構(gòu)。

AIO 代碼示例

服務(wù)端

package com.niuh.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AIOServer {
    public static void main(String[] args) throws Exception {
        final AsynchronousServerSocketChannel serverChannel =
                AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(9000));

        serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(final AsynchronousSocketChannel socketChannel, Object attachment) {
                try {
                    // 再此接收客戶端連接,如果不寫這行代碼后面的客戶端連接連不上服務(wù)端
                    serverChannel.accept(attachment, this);
                    System.out.println(socketChannel.getRemoteAddress());
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer buffer) {
                            buffer.flip();
                            System.out.println(new String(buffer.array(), 0, result));
                            socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes()));
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer buffer) {
                            exc.printStackTrace();
                        }
                    });
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        });

        Thread.sleep(Integer.MAX_VALUE);
    }
}

客戶端

package com.niuh.aio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;

public class AIOClient {

    public static void main(String... args) throws Exception {
        AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 9000)).get();
        socketChannel.write(ByteBuffer.wrap("HelloServer".getBytes()));
        ByteBuffer buffer = ByteBuffer.allocate(512);
        Integer len = socketChannel.read(buffer).get();
        if (len != -1) {
            System.out.println("客戶端收到信息:" + new String(buffer.array(), 0, len));
        }
    }
}

BIO、NIO、AIO對比

BIO NIO AIO
IO模型 同步阻塞 同步非阻塞(多路復(fù)用) 異步非阻塞
編程難度 簡單 復(fù)雜 復(fù)雜
可靠性
吞吐量

PS:以上代碼提交在 Githubhttps://github.com/Niuh-Study/niuh-netty.git

文章持續(xù)更新,可以公眾號搜一搜「 一角錢技術(shù) 」第一時間閱讀,本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄,歡迎 Star。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容