NIO底層原理

IO基本概念

Linux的內(nèi)核將所有外部設(shè)備都可以看做一個(gè)文件來(lái)操作,而對(duì)一個(gè)文件的讀寫都是通過(guò)內(nèi)核提供的系統(tǒng)調(diào)用,內(nèi)核給我們返回一個(gè)文件描述符file descriptor,文件描述符是一個(gè)數(shù)組索引,指向內(nèi)核維護(hù)的文件表格,應(yīng)用程序?qū)ξ募淖x寫就是通過(guò)文件描述符的操作完成。
那么我們對(duì)與外部設(shè)備的操作都可以看做對(duì)文件進(jìn)行操作。而對(duì)一個(gè)socket的讀寫也會(huì)有相應(yīng)的描述符,稱為socketfd(socket描述符)。

linux將內(nèi)存分為內(nèi)核區(qū),用戶區(qū)。linux內(nèi)核給我們管理所有的硬件資源,應(yīng)用程序通過(guò)調(diào)用系統(tǒng)調(diào)用和內(nèi)核交互,達(dá)到使用硬件資源的目的。應(yīng)用程序通過(guò)系統(tǒng)調(diào)用read發(fā)起一個(gè)讀操作,這時(shí)候內(nèi)核創(chuàng)建一個(gè)文件描述符,并通過(guò)驅(qū)動(dòng)程序向硬件發(fā)送讀指令,并將讀的的數(shù)據(jù)放在這個(gè)描述符對(duì)應(yīng)結(jié)構(gòu)體的內(nèi)核緩存區(qū)中,然后再把這個(gè)數(shù)據(jù)讀到用戶進(jìn)程空間中,這樣完成了一次讀操作;但是大家都知道I/O設(shè)備相比cpu的速度是極慢的。linux提供的read系統(tǒng)調(diào)用,也是一個(gè)阻塞函數(shù)。這樣我們的應(yīng)用進(jìn)程在發(fā)起read系統(tǒng)調(diào)用時(shí),就必須阻塞,就進(jìn)程被掛起而等待文件描述符的讀就緒,那么什么是文件描述符讀就緒,什么是寫就緒?

讀就緒:就是這個(gè)文件描述符的接收緩沖區(qū)中的數(shù)據(jù)字節(jié)數(shù)大于等于套接字接收緩沖區(qū)低水位標(biāo)記的當(dāng)前大??;
寫就緒:該描述符發(fā)送緩沖區(qū)的可用空間字節(jié)數(shù)大于等于描述符發(fā)送緩沖區(qū)低水位標(biāo)記的當(dāng)前大小。(如果是socket fd,說(shuō)明上一個(gè)數(shù)據(jù)已經(jīng)發(fā)送完成)。

接收低水位標(biāo)記和發(fā)送低水位標(biāo)記:由應(yīng)用程序指定,比如應(yīng)用程序指定接收低水位為64個(gè)字節(jié)。那么接收緩沖區(qū)有64個(gè)字節(jié),才算fd讀就緒;
綜上所述,一個(gè)基本的IO,它會(huì)涉及到兩個(gè)系統(tǒng)對(duì)象,一個(gè)是調(diào)用這個(gè)IO的進(jìn)程對(duì)象,另一個(gè)就是系統(tǒng)內(nèi)核,當(dāng)一個(gè)read操作發(fā)生時(shí),它會(huì)經(jīng)歷如下階段:

  1. 進(jìn)程對(duì)象通過(guò)read系統(tǒng)調(diào)用向內(nèi)核發(fā)起讀請(qǐng)求
  2. 內(nèi)核向硬件發(fā)送讀指令,并等待讀就緒。
  3. 內(nèi)核把將要讀取的數(shù)據(jù)復(fù)制到內(nèi)核緩沖區(qū)
  4. 數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶進(jìn)程空間中

同步與異步

所謂同步就是發(fā)出一個(gè)調(diào)用后,在沒(méi)有得到結(jié)果之前該調(diào)用就不返回,就是調(diào)用者主動(dòng)等待這個(gè)調(diào)用的結(jié)果。 異步則相反,調(diào)用發(fā)出后,這個(gè)調(diào)用就直接返回了,調(diào)用者不會(huì)立刻得到結(jié)果,而是在調(diào)用發(fā)出后,被調(diào)用者通過(guò)回調(diào)函數(shù)等方式來(lái)告知調(diào)用者。
舉個(gè)通俗的例子:
你打電話問(wèn)書店老板有沒(méi)有《分布式系統(tǒng)》這本書,如果是同步通信機(jī)制,書店老板會(huì)說(shuō),你稍等,”我查一下”,然后開始查啊查,等查好了(可能是5秒,也可能是一天)告訴你結(jié)果(返回結(jié)果)。
而異步通信機(jī)制,書店老板直接告訴你我查一下啊,查好了打電話給你,然后直接掛電話了(不返回結(jié)果)。然后查好了,他會(huì)主動(dòng)打電話給你。在這里老板通過(guò)“回電”這種方式來(lái)回調(diào)。

本質(zhì)上,訪問(wèn)數(shù)據(jù)的方式,同步需要當(dāng)前線程讀寫數(shù)據(jù),在讀寫數(shù)據(jù)的過(guò)程中數(shù)據(jù)可能還沒(méi)ready,可能會(huì)阻塞,而異步io則是操作系統(tǒng)等數(shù)據(jù)ready之后會(huì)通知進(jìn)程數(shù)據(jù)好了,可以直接讀了

阻塞io模型:

在缺省情形下,所有文件操作都是阻塞的,在進(jìn)程空間中調(diào)用recvfrom,其系統(tǒng)調(diào)用直到數(shù)據(jù)報(bào)到達(dá)且被拷貝到應(yīng)用進(jìn)程的緩沖區(qū)中或者發(fā)生錯(cuò)誤才返回,期間一直在等待。我們就說(shuō)進(jìn)程在從調(diào)用recvfrom開始到它返回的整段時(shí)間內(nèi)是被阻塞的。

image.png

當(dāng)用戶進(jìn)程調(diào)用了recvfrom這個(gè)系統(tǒng)調(diào)用,kernel就開始了IO的第一個(gè)階段,準(zhǔn)備數(shù)據(jù),對(duì)于network IO, 很多時(shí)候數(shù)據(jù)在一開始還沒(méi)有到達(dá),比如還沒(méi)有收到一個(gè)完整的UDP包,這個(gè)時(shí)候kernel就要等待足夠的數(shù)據(jù)到來(lái)。而用戶進(jìn)程這邊,整個(gè)進(jìn)程會(huì)被阻塞,當(dāng)kernel一直等到數(shù)據(jù)準(zhǔn)備好了,它就會(huì)將數(shù)據(jù)從kernel中拷貝到用戶內(nèi)存,然后kernel返回結(jié)果,用戶進(jìn)程才解除block狀態(tài),重新運(yùn)行起來(lái)。

非阻塞IO模型

進(jìn)程把一個(gè)套接口設(shè)置為非阻塞是在通知內(nèi)核:當(dāng)所請(qǐng)求的IO操作不能滿足要求時(shí),不把本進(jìn)程投入睡眠,而是返回一個(gè)錯(cuò)誤。也就是說(shuō)當(dāng)數(shù)據(jù)沒(méi)有到達(dá)時(shí)并不等待,而是以一個(gè)錯(cuò)誤返回。


image.png

從圖中可以看出,當(dāng)用戶進(jìn)程發(fā)出read操作時(shí),如果kernel中的數(shù)據(jù)還沒(méi)有準(zhǔn)備好,它并不會(huì)block用戶進(jìn)程,而是立刻返回一個(gè)error。從用戶進(jìn)程角度講,它發(fā)起一個(gè)read操作后,并不需要等待,而是馬上就得到了一個(gè)結(jié)果。用戶進(jìn)程判斷結(jié)果是一個(gè)error時(shí),它就知道數(shù)據(jù)還沒(méi)有準(zhǔn)備好,于是它可以再次發(fā)送read操作,一旦kernel中的數(shù)據(jù)準(zhǔn)備好了,并且又再次收到了用戶進(jìn)程的system call,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存,然后返回。所以,用戶進(jìn)程其實(shí)是需要不斷的主動(dòng)詢問(wèn)kernel數(shù)據(jù)好了沒(méi)有。

IO復(fù)用模型

linux提供select/poll,進(jìn)程通過(guò)將一個(gè)或多個(gè)fd傳遞給select或poll系統(tǒng)調(diào)用,select/poll會(huì)不斷輪詢所負(fù)責(zé)的所有socket,可以偵測(cè)許多fd是否就緒,但select和poll是順序掃描fd是否就緒,并且支持的fd數(shù)量有限。linux還提供了epoll系統(tǒng)調(diào)用,它是基于事件驅(qū)動(dòng)的方式,而不是順序掃描,當(dāng)某個(gè)socket有數(shù)據(jù)到達(dá)了,可以直接通知用戶進(jìn)程,而不需要順序輪詢掃描,提高了效率。


image.png

當(dāng)進(jìn)程調(diào)用了select,整個(gè)進(jìn)程會(huì)被block,同時(shí),kernel會(huì)監(jiān)視所有select負(fù)責(zé)的socket,當(dāng)任何一個(gè)socket的數(shù)據(jù)準(zhǔn)備好了,select就會(huì)返回,這個(gè)圖和阻塞IO的圖其實(shí)并沒(méi)有多大區(qū)別,事實(shí)上,還更差一點(diǎn),因?yàn)檫@里需要使用兩個(gè)System call,select和recvFrom,而blocking io只調(diào)用了一個(gè)system call(recvfrom),但是select的好處在與它可以同時(shí)處理多個(gè)connection,(如果處理的連接數(shù)不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優(yōu)勢(shì)并不是對(duì)于單個(gè)連接能處理得更快,而是在于能處理更多的連接。)

信號(hào)驅(qū)動(dòng)異步IO模型

首先開啟套接口信號(hào)驅(qū)動(dòng)I/O功能, 并通過(guò)系統(tǒng)調(diào)用sigaction安裝一個(gè)信號(hào)處理函數(shù)(此系統(tǒng)調(diào)用立即返回,進(jìn)程繼續(xù)工作,它是非阻塞的)。當(dāng)數(shù)據(jù)報(bào)準(zhǔn)備好被讀時(shí),就為該進(jìn)程生成一個(gè)SIGIO信號(hào)。隨即可以在信號(hào)處理程序中調(diào)用recvfrom來(lái)讀數(shù)據(jù)報(bào),井通知主循環(huán)數(shù)據(jù)已準(zhǔn)備好被處理中。也可以通知主循環(huán),讓它來(lái)讀數(shù)據(jù)報(bào)。

image.png

異步I/O模型

告知內(nèi)核啟動(dòng)某個(gè)操作,并讓內(nèi)核在整個(gè)操作完成后(包括將數(shù)據(jù)從內(nèi)核拷貝到用戶自己的緩沖區(qū))通知用戶進(jìn)程,這種模型和信號(hào)驅(qū)動(dòng)模型的主要區(qū)別是:信號(hào)驅(qū)動(dòng)IO:由內(nèi)核通知我們何時(shí)可以啟動(dòng)一個(gè)IO操作,異步IO模型:由內(nèi)核通知我們IO操作何時(shí)完成


image.png

用戶進(jìn)程發(fā)起read操作之后,立刻就可以開始去做其他的事了,從kernel的角度,當(dāng)它受到一個(gè)asynchronous read之后,首先它會(huì)立刻返回,不會(huì)對(duì)用戶進(jìn)程產(chǎn)生任何block,然后,kernel會(huì)等待數(shù)據(jù)準(zhǔn)備完成,然后再將數(shù)據(jù)拷貝到用戶進(jìn)程內(nèi)存,當(dāng)著一切都完成之后,kernel會(huì)給用戶進(jìn)程發(fā)送一個(gè)signal,告訴它read操作已經(jīng)完成。

小結(jié) 前面幾種都是同步IO,在內(nèi)核數(shù)據(jù)copy到用戶空間都是阻塞的。最后一種是異步IO,通過(guò)API把IO操作交給操作系統(tǒng)處理,當(dāng)前進(jìn)程不關(guān)心具體IO的實(shí)現(xiàn),通過(guò)回調(diào)函數(shù)或者信號(hào)量通知當(dāng)前進(jìn)程直接對(duì)IO返回結(jié)果進(jìn)行處理。一個(gè)IO操作其實(shí)分成了兩個(gè)步驟,發(fā)起IO請(qǐng)求和實(shí)際的IO操作,同步IO和異步IO的區(qū)別就在于第二步是否阻塞,如果實(shí)際的IO讀寫阻塞請(qǐng)求進(jìn)程,那就是同步IO,因此前四種都是同步IO,如果不阻塞,而是操作系統(tǒng)幫你做完IO操作再將結(jié)果返回給你,那就是異步IO。阻塞IO和非阻塞IO的區(qū)別在于第一步,發(fā)起IO請(qǐng)求是否會(huì)被阻塞,如果阻塞直到完成那么就是傳統(tǒng)的阻塞IO,如果不阻塞,那就是非阻塞IO.

舉個(gè)簡(jiǎn)單例子來(lái)說(shuō)明:

有A,B,C,D四個(gè)人在釣魚:
A用的是最老式的魚竿,所以呢,得一直守著,等到魚上鉤了再拉桿;
B的魚竿有個(gè)功能,能夠顯示是否有魚上鉤,所以呢,B沒(méi)事就睡覺,隔會(huì)再看看有沒(méi)有魚上鉤,有的話就迅速拉桿;
C用的魚竿和B差不多,但他想了一個(gè)好辦法,就是同時(shí)放好幾根魚竿,然后守在旁邊,一旦有顯示說(shuō)魚上鉤了,它就將對(duì)應(yīng)的魚竿拉起來(lái);
D是個(gè)有錢人,干脆雇了一個(gè)人幫他釣魚,一旦那個(gè)人把魚釣上來(lái)了,就給D發(fā)個(gè)短信。

A: 阻塞IO, B: 非阻塞IO: C: NIO D: AIO

AIO BIO NIO

  • AIO異步非阻塞IO,AIO方式適用于連接數(shù)目多且連接比較長(zhǎng)的架構(gòu),充分調(diào)用OS參與并發(fā)操作,編程比較復(fù)雜,JDK7開始支持。
  • NIO同步非阻塞IO,適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),比如聊天服務(wù)器,并發(fā)局限于應(yīng)用中,編程比較復(fù)雜,JDK1.4開始支持。
  • BIO同步阻塞IO,適用于連接數(shù)目比較小且固定的架構(gòu),這種方式對(duì)服務(wù)器資源要求比較高,并發(fā)局限于應(yīng)用中,JDK1.4以前的唯一選擇,但程序直觀簡(jiǎn)單易理解。

Java對(duì)BIO、NIO、AIO的支持:

Java BIO: 同步并阻塞,服務(wù)器實(shí)現(xiàn)模式為一個(gè)連接一個(gè)線程,即客戶端有連接請(qǐng)求時(shí)服務(wù)端就需要啟動(dòng)一個(gè)線程進(jìn)行處理,如果這個(gè)連接不做任何事情會(huì)造成不必要的線程開銷,可以通過(guò)線程池機(jī)制改善
JAVA NIO: 同步非阻塞,服務(wù)器實(shí)現(xiàn)模式為一個(gè)請(qǐng)求一個(gè)線程,即客戶端發(fā)送的連接請(qǐng)求都會(huì)注冊(cè)到多路復(fù)用器上,多路復(fù)用器輪詢到連接有IO請(qǐng)求才啟動(dòng)一個(gè)線程進(jìn)行處理。
Java AIO: 異步非阻塞,服務(wù)器實(shí)現(xiàn)模式為一個(gè)有效請(qǐng)求一個(gè)線程,客戶端的IO請(qǐng)求都是由OS先完成了再通知服務(wù)器應(yīng)用去啟動(dòng)線程進(jìn)行處理。

Selector

Nio中的selector具體是一個(gè)什么樣的東西?想想一個(gè)場(chǎng)景:在一個(gè)養(yǎng)雞場(chǎng),有這么一個(gè)人,每天的工作就是不停檢查幾個(gè)特殊的雞籠,如果有雞進(jìn)來(lái),有雞出去,有雞生蛋,有雞生病等等,就把相應(yīng)的情況記錄下來(lái),如果雞場(chǎng)的負(fù)責(zé)人想知道情況,只需要詢問(wèn)那個(gè)人即可。在這里,這個(gè)人就相當(dāng)Selector,每個(gè)雞籠相當(dāng)于一個(gè)SocketChannel,每個(gè)線程通過(guò)一個(gè)Selector管理多個(gè)SocketChannel,


image.png

為了實(shí)現(xiàn)Selector管理多個(gè)SocketChannel,必須將具體得socketChannel對(duì)象注冊(cè)到Selector,并聲明需要監(jiān)聽的事件,一共有四種事件:
1、connect:客戶端連接服務(wù)端事件,對(duì)應(yīng)值為SelectionKey.OPCONNECT(8)

2、accept:服務(wù)端接收客戶端連接事件,對(duì)應(yīng)值為SelectionKey.OPACCEPT(16)

3、read:讀事件,對(duì)應(yīng)值為SelectionKey.OPREAD(1)

4、write:寫事件,對(duì)應(yīng)值為SelectionKey.OPWRITE(4)

當(dāng)SocketChannel有對(duì)應(yīng)的事件發(fā)生時(shí),Selector都可以觀察到,并進(jìn)行處理。

服務(wù)端代碼

ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
    int n = selector.select();
    if (n == 0) continue;
    Iterator ite = this.selector.selectedKeys().iterator();
    while(ite.hasNext()){
        SelectionKey key = (SelectionKey)ite.next();
        if (key.isAcceptable()){
            SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
            clntChan.configureBlocking(false);
            //將選擇器注冊(cè)到連接到的客戶端信道,
            //并指定該信道key值的屬性為OP_READ,
            //同時(shí)為該信道指定關(guān)聯(lián)的附件
            clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
        }
        if (key.isReadable()){
            handleRead(key);
        }
        if (key.isWritable() && key.isValid()){
            handleWrite(key);
        }
        if (key.isConnectable()){
            System.out.println("isConnectable = true");
        }
      ite.remove();
    }
}

服務(wù)端操作過(guò)程:

  1. 創(chuàng)建ServerSocketChannel實(shí)例,并綁定指定端口
  2. 創(chuàng)建selector實(shí)例
  3. 將serverSocketChannel注冊(cè)到selector,并指定事件OP_ACCEPT,最底層的socket通過(guò)channel和selector建立關(guān)聯(lián)
  4. 如果沒(méi)有準(zhǔn)備好的socket,select方法會(huì)被阻塞一段時(shí)間
  5. 如果底層有socket已經(jīng)準(zhǔn)備好,selector的select方法會(huì)返回socket的個(gè)數(shù),而且selectedKeys方法會(huì)返回socket對(duì)應(yīng)的事件(connect, accept、read、write);
  6. 根據(jù)事件類型,進(jìn)行不同的邏輯處理

在步驟3中,selector只注冊(cè)了serverSocketChannel的OP_ACCEPT事件,如果有客戶端A連接服務(wù),執(zhí)行select方法時(shí),可以通過(guò)serverSocketChannel獲取客戶端A的socketChannel,并在selector上注冊(cè)socketChannel的OP_READ事件。如果客戶端A發(fā)送數(shù)據(jù),會(huì)觸發(fā)read事件,下次輪詢調(diào)用select方法時(shí),就能通過(guò)socketChannel讀取數(shù)據(jù),同時(shí)在selector上注冊(cè)該socketChannel的OP_WRITE事件,實(shí)現(xiàn)服務(wù)器往客戶端寫數(shù)據(jù)。

Selector實(shí)現(xiàn)原理

下載openjdk源碼,我們一探究竟,


image.png

比較清晰得看到,openjdk中Selector的實(shí)現(xiàn)是SelectorImpl,
然后SelectorImpl又將職責(zé)委托給了具體的平臺(tái),比如圖中框出的linux2.6以后才有的EpollSelectorImpl, Windows平臺(tái)則是WindowsSelectorImpl, MacOSX平臺(tái)是KQueueSelectorImpl. 從名字也可以猜到,openjdk肯定在底層還是用epoll,kqueue,iocp這些技術(shù)來(lái)實(shí)現(xiàn)的I/O多路復(fù)用

獲取selector

眾所周知,Selector.open()可以得到一個(gè)Selector實(shí)例,怎么實(shí)現(xiàn)的呢?

// Selector.java
public static Selector open() throws IOException {
    // 首先找到provider,然后再打開Selector
    return SelectorProvider.provider().openSelector();
}

// java.nio.channels.spi.SelectorProvider
    public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                            // 這里就是打開Selector的真正方法
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}

在openjdk中,每個(gè)操作系統(tǒng)都有一個(gè)sun.nio.ch.DefaultSelectorProvider實(shí)現(xiàn),如果系統(tǒng)是Linux的話,真正創(chuàng)建的是sun.nio.ch.EPollSelectorProvider,如果是macos,真正創(chuàng)建的是 KQueueSelectorProvider,我們以linux的EpollSelector為例說(shuō)明:

public class EPollSelectorProvider
    extends SelectorProviderImpl
{
    public AbstractSelector openSelector() throws IOException {
        return new EPollSelectorImpl(this);
    }
}

很直觀,這樣我們?cè)贚inux平臺(tái)就得到了最終的Selector實(shí)現(xiàn):sun.nio.ch.EPollSelectorImpl

EPollSelector如何進(jìn)行select

epoll原理

epoll是Linux下的一種IO多路復(fù)用技術(shù),可以非常高效的處理數(shù)以百萬(wàn)計(jì)的socket句柄。
三個(gè)epoll相關(guān)的系統(tǒng)調(diào)用:

  • int epoll_create(int size)
    epoll_create建立一個(gè)epoll對(duì)象。參數(shù)size是內(nèi)核保證能夠正確處理的最大句柄數(shù),多于這個(gè)最大數(shù)時(shí)內(nèi)核可不保證效果。

  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
    epoll_ctl可以操作epoll_create創(chuàng)建的epoll,如將socket句柄加入到epoll中讓其監(jiān)控,或把epoll正在監(jiān)控的某個(gè)socket句柄移出epoll。

  • int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout)
    epoll_wait在調(diào)用時(shí),在給定的timeout時(shí)間內(nèi),所監(jiān)控的句柄中有事件發(fā)生時(shí),就返回用戶態(tài)的進(jìn)程。

epoll內(nèi)部實(shí)現(xiàn)大概如下:

  • epoll初始化時(shí),會(huì)向內(nèi)核注冊(cè)一個(gè)文件系統(tǒng),用于存儲(chǔ)被監(jiān)控的句柄文件,調(diào)用epoll_create時(shí),會(huì)在這個(gè)文件系統(tǒng)中創(chuàng)建一個(gè)file節(jié)點(diǎn)。同時(shí)epoll會(huì)開辟自己的內(nèi)核高速緩存區(qū),以紅黑樹的結(jié)構(gòu)保存句柄,以支持快速的查找、插入、刪除。還會(huì)再建立一個(gè)list鏈表,用于存儲(chǔ)準(zhǔn)備就緒的事件。
  • 當(dāng)執(zhí)行epoll_ctl時(shí),除了把socket句柄放到epoll文件系統(tǒng)里file對(duì)象對(duì)應(yīng)的紅黑樹上之外,還會(huì)給內(nèi)核中斷處理程序注冊(cè)一個(gè)回調(diào)函數(shù),告訴內(nèi)核,如果這個(gè)句柄的中斷到了,就把它放到準(zhǔn)備就緒list鏈表里。所以,當(dāng)一個(gè)socket上有數(shù)據(jù)到了,內(nèi)核在把網(wǎng)卡上的數(shù)據(jù)copy到內(nèi)核中后,就把socket插入到就緒鏈表里。
  • 當(dāng)epoll_wait調(diào)用時(shí),僅僅觀察就緒鏈表里有沒(méi)有數(shù)據(jù),如果有數(shù)據(jù)就返回,否則就sleep,超時(shí)時(shí)立刻返回。

Epoll fd的創(chuàng)建

EPollSelectorImpl的構(gòu)造器代碼如下:


  EPollSelectorImpl(SelectorProvider sp) throws IOException {
        super(sp);

        this.epfd = EPoll.create();
        this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);

        try {
     // makePipe返回管道的2個(gè)文件描述符,編碼在一個(gè)long類型的變量中
    // 高32位代表讀 低32位代表寫
    // 使用pipe為了實(shí)現(xiàn)Selector的wakeup邏輯
            long fds = IOUtil.makePipe(false);
            this.fd0 = (int) (fds >>> 32);
            this.fd1 = (int) fds;
        } catch (IOException ioe) {
            EPoll.freePollArray(pollArrayAddress);
            FileDispatcherImpl.closeIntFD(epfd);
            throw ioe;
        }

        // register one end of the socket pair for wakeups
        EPoll.ctl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
    }
}
    static native int create() throws IOException; // epollCreate方法,這是個(gè)native方法。

在Epoll.c中可以看到:

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPoll_create(JNIEnv *env, jclass clazz) {
    /* size hint not used in modern kernels */
    int epfd = epoll_create(256);
    if (epfd < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
    }
    return epfd;
}

可以看到最后還是使用了操作系統(tǒng)的api: epoll_create函數(shù)

Epoll wait等待內(nèi)核IO事件

調(diào)用Selector.select(),

 @Override
    public final int select(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("Negative timeout");
        return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
    }
 private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
    {
        synchronized (this) {
            ensureOpen();
            if (inSelect)
                throw new IllegalStateException("select in progress");
            inSelect = true;
            try {
                synchronized (publicSelectedKeys) {
                    return doSelect(action, timeout);
                }
            } finally {
                inSelect = false;
            }
        }
    }
    
 protected int doSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
    {
        assert Thread.holdsLock(this);

        // epoll_wait timeout is int
        int to = (int) Math.min(timeout, Integer.MAX_VALUE);
        boolean blocking = (to != 0);
        boolean timedPoll = (to > 0);

        int numEntries;
        processUpdateQueue();
        processDeregisterQueue();
        try {
            begin(blocking);

            do {
                long startTime = timedPoll ? System.nanoTime() : 0;
                // 
                numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
                if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
                    // timed poll interrupted so need to adjust timeout
                    long adjust = System.nanoTime() - startTime;
                    to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
                    if (to <= 0) {
                        // timeout expired so no retry
                        numEntries = 0;
                    }
                }
            } while (numEntries == IOStatus.INTERRUPTED);
            assert IOStatus.check(numEntries);

        } finally {
            end(blocking);
        }
        processDeregisterQueue();
        return processEvents(numEntries, action);
    }
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPoll_wait(JNIEnv *env, jclass clazz, jint epfd,
                           jlong address, jint numfds, jint timeout)
{
    struct epoll_event *events = jlong_to_ptr(address);
        // 發(fā)起epoll_wait系統(tǒng)調(diào)用等待內(nèi)核事件
    int res = epoll_wait(epfd, events, numfds, timeout);
    if (res < 0) {
        if (errno == EINTR) {
            return IOS_INTERRUPTED;
        } else {
            JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
            return IOS_THROWN;
        }
    }
    return res;
}

可以看到,最后還是發(fā)起的epoll_wait系統(tǒng)調(diào)用.

epoll control以及openjdk對(duì)事件管理的封裝

JDK中對(duì)于注冊(cè)到Selector上的IO事件關(guān)系是使用SelectionKey來(lái)表示,代表了Channel感興趣的事件,如Read,Write,Connect,Accept.

調(diào)用Selector.register()時(shí)均會(huì)將事件存儲(chǔ)到EpollArrayWrapper的成員變量eventsLow和eventsHigh中

// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
// 使用數(shù)組保存事件變更, 數(shù)組的最大長(zhǎng)度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超過(guò)數(shù)組長(zhǎng)度的事件會(huì)緩存到這個(gè)map中,等待下次處理
private Map<Integer,Byte> eventsHigh;

/**
 * Sets the pending update events for the given file descriptor. This
 * method has no effect if the update events is already set to KILLED,
 * unless {@code force} is {@code true}.
 */
private void setUpdateEvents(int fd, byte events, boolean force) {
    // 判斷fd和數(shù)組長(zhǎng)度
    if (fd < MAX_UPDATE_ARRAY_SIZE) {
        if ((eventsLow[fd] != KILLED) || force) {
            eventsLow[fd] = events;
        }
    } else {
        Integer key = Integer.valueOf(fd);
        if (!isEventsHighKilled(key) || force) {
            eventsHigh.put(key, Byte.valueOf(events));
        }
    }
}

在EpollArrayWrapper.poll()的時(shí)候, 首先會(huì)調(diào)用updateRegistrations,

/**
 * Returns the pending update events for the given file descriptor.
 */
private byte getUpdateEvents(int fd) {
    if (fd < MAX_UPDATE_ARRAY_SIZE) {
        return eventsLow[fd];
    } else {
        Byte result = eventsHigh.get(Integer.valueOf(fd));
        // result should never be null
        return result.byteValue();
    }
}

/**
 * Update the pending registrations.
 */
private void updateRegistrations() {
    synchronized (updateLock) {
        int j = 0;
        while (j < updateCount) {
            int fd = updateDescriptors[j];
            // 從保存的eventsLow和eventsHigh里取出事件
            short events = getUpdateEvents(fd);
            boolean isRegistered = registered.get(fd);
            int opcode = 0;

            if (events != KILLED) {
                // 判斷操作類型以傳給epoll_ctl
                // 沒(méi)有指定EPOLLET事件類型
                if (isRegistered) {
                    opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                } else {
                    opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                }
                if (opcode != 0) {
                    // 熟悉的epoll_ctl
                    epollCtl(epfd, opcode, fd, events);
                    if (opcode == EPOLL_CTL_ADD) {
                        registered.set(fd);
                    } else if (opcode == EPOLL_CTL_DEL) {
                        registered.clear(fd);
                    }
                }
            }
            j++;
        }
        updateCount = 0;
    }
}
private native void epollCtl(int epfd, int opcode, int fd, int events);
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
                                           jint opcode, jint fd, jint events)
{
    struct epoll_event event;
    int res;

    event.events = events;
    event.data.fd = fd;

    // 發(fā)起epoll_ctl調(diào)用來(lái)進(jìn)行IO事件的管理
    RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);

    /*
     * A channel may be registered with several Selectors. When each Selector
     * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
     * list to remove the file descriptor from epoll. The "last" Selector will
     * close the file descriptor which automatically unregisters it from each
     * epoll descriptor. To avoid costly synchronization between Selectors we
     * allow pending updates to be processed, ignoring errors. The errors are
     * harmless as the last update for the file descriptor is guaranteed to
     * be EPOLL_CTL_DEL.
     */
    if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
    }
}

簡(jiǎn)而言之,register最后還是使用的epoll_ctl

有個(gè)小細(xì)節(jié)是jdk沒(méi)有指定ET(邊緣觸發(fā))還是LT(水平觸發(fā)),所以默認(rèn)會(huì)用LT.

水平觸發(fā)(level-trggered)
只要文件描述符關(guān)聯(lián)的讀內(nèi)核緩沖區(qū)非空,有數(shù)據(jù)可以讀取,就一直發(fā)出可讀信號(hào)進(jìn)行通知,
當(dāng)文件描述符關(guān)聯(lián)的內(nèi)核寫緩沖區(qū)不滿,有空間可以寫入,就一直發(fā)出可寫信號(hào)進(jìn)行通知
LT模式支持阻塞和非阻塞兩種方式。epoll默認(rèn)的模式是LT。

邊緣觸發(fā)(edge-triggered)

當(dāng)文件描述符關(guān)聯(lián)的讀內(nèi)核緩沖區(qū)由空轉(zhuǎn)化為非空的時(shí)候,則發(fā)出可讀信號(hào)進(jìn)行通知,
當(dāng)文件描述符關(guān)聯(lián)的內(nèi)核寫緩沖區(qū)由滿轉(zhuǎn)化為不滿的時(shí)候,則發(fā)出可寫信號(hào)進(jìn)行通知
兩者的區(qū)別在哪里呢?水平觸發(fā)是只要讀緩沖區(qū)有數(shù)據(jù),就會(huì)一直觸發(fā)可讀信號(hào),而邊緣觸發(fā)僅僅在空變?yōu)榉强盏臅r(shí)候通知一次.

LT(level triggered)是缺省的工作方式,并且同時(shí)支持block和no-block socket.在這種做法中,內(nèi)核告訴你一個(gè)文件描述符是否就緒了,然后你可以對(duì)這個(gè)就緒的fd進(jìn)行IO操作。如果你不作任何操作,內(nèi)核還是會(huì)繼續(xù)通知你的,所以,這種模式編程出錯(cuò)誤可能性要小一點(diǎn)。傳統(tǒng)的select/poll都是這種模型的代表.

水平觸發(fā)和邊緣觸發(fā)模式區(qū)別

讀緩沖區(qū)剛開始是空的
讀緩沖區(qū)寫入2KB數(shù)據(jù)
水平觸發(fā)和邊緣觸發(fā)模式此時(shí)都會(huì)發(fā)出可讀信號(hào)
收到信號(hào)通知后,讀取了1kb的數(shù)據(jù),讀緩沖區(qū)還剩余1KB數(shù)據(jù)
水平觸發(fā)會(huì)再次進(jìn)行通知,而邊緣觸發(fā)不會(huì)再進(jìn)行通知

所以,邊緣觸發(fā)需要一次性的把緩沖區(qū)的數(shù)據(jù)讀完為止,也就是一直讀,直到讀到EGAIN為止,EGAIN說(shuō)明緩沖區(qū)已經(jīng)空了,因?yàn)檫@一點(diǎn),邊緣觸發(fā)需要設(shè)置文件句柄為非阻塞

//水平觸發(fā)
ret = read(fd, buf, sizeof(buf));

//邊緣觸發(fā)
while(true) {
    ret = read(fd, buf, sizeof(buf);
    if (ret == EAGAIN) break;
}

在AbstractSelectorImpl中有3個(gè)set保存事件

// Public views of the key sets
// 注冊(cè)的所有事件
private Set<SelectionKey> publicKeys;             // Immutable
// 內(nèi)核返回的IO事件封裝,表示哪些fd有數(shù)據(jù)可讀可寫
private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition

// 取消的事件
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();

在EpollArrayWrapper.poll調(diào)用完成之后, 會(huì)調(diào)用updateSelectedKeys來(lái)更新上面的三set

private int updateSelectedKeys() {
    int entries = pollWrapper.updated;
    int numKeysUpdated = 0;
    for (int i=0; i<entries; i++) {
        int nextFD = pollWrapper.getDescriptor(i);
        SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
        // ski is null in the case of an interrupt
        if (ski != null) {
            int rOps = pollWrapper.getEventOps(i);
            if (selectedKeys.contains(ski)) {
                if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                    numKeysUpdated++;
                }
            } else {
                ski.channel.translateAndSetReadyOps(rOps, ski);
                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                    selectedKeys.add(ski);
                    numKeysUpdated++;
                }
            }
        }
    }
    return numKeysUpdated;
}
image.png

jdk中Selector是對(duì)操作系統(tǒng)的IO多路復(fù)用調(diào)用的一個(gè)封裝,在Linux中就是對(duì)epoll的封裝。epoll實(shí)質(zhì)上是將event loop交給了內(nèi)核,因?yàn)榫W(wǎng)絡(luò)數(shù)據(jù)都是首先到內(nèi)核的,直接內(nèi)核處理可以避免無(wú)謂的系統(tǒng)調(diào)用和數(shù)據(jù)拷貝, 性能是最好的。jdk中對(duì)IO事件的封裝是SelectionKey, 保存Channel關(guān)心的事件。

簡(jiǎn)單說(shuō),nio是依賴操作系統(tǒng)的實(shí)現(xiàn),java并不能一個(gè)線程同時(shí)監(jiān)聽多個(gè)socket,
,在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型實(shí)現(xiàn),是基于IO復(fù)用技術(shù)的非阻塞IO。在JDK1.5 update10和linux core2.6以上版本,sun優(yōu)化了Selctor的實(shí)現(xiàn),底層使用epoll替換了select/poll。

彩蛋
io到epoll (1).png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容