IO基本概念
Linux的內(nèi)核將所有外部設(shè)備都可以看做一個(gè)文件來(lái)操作,而對(duì)一個(gè)文件的讀寫都是通過(guò)內(nèi)核提供的系統(tǒng)調(diào)用,內(nèi)核給我們返回一個(gè)文件描述符file descriptor,文件描述符是一個(gè)數(shù)組索引,指向內(nèi)核維護(hù)的文件表格,應(yīng)用程序?qū)ξ募淖x寫就是通過(guò)文件描述符的操作完成。
那么我們對(duì)與外部設(shè)備的操作都可以看做對(duì)文件進(jìn)行操作。而對(duì)一個(gè)socket的讀寫也會(huì)有相應(yīng)的描述符,稱為socketfd(socket描述符)。
linux將內(nèi)存分為內(nèi)核區(qū),用戶區(qū)。linux內(nèi)核給我們管理所有的硬件資源,應(yīng)用程序通過(guò)調(diào)用系統(tǒng)調(diào)用和內(nèi)核交互,達(dá)到使用硬件資源的目的。應(yīng)用程序通過(guò)系統(tǒng)調(diào)用read發(fā)起一個(gè)讀操作,這時(shí)候內(nèi)核創(chuàng)建一個(gè)文件描述符,并通過(guò)驅(qū)動(dòng)程序向硬件發(fā)送讀指令,并將讀的的數(shù)據(jù)放在這個(gè)描述符對(duì)應(yīng)結(jié)構(gòu)體的內(nèi)核緩存區(qū)中,然后再把這個(gè)數(shù)據(jù)讀到用戶進(jìn)程空間中,這樣完成了一次讀操作;但是大家都知道I/O設(shè)備相比cpu的速度是極慢的。linux提供的read系統(tǒng)調(diào)用,也是一個(gè)阻塞函數(shù)。這樣我們的應(yīng)用進(jìn)程在發(fā)起read系統(tǒng)調(diào)用時(shí),就必須阻塞,就進(jìn)程被掛起而等待文件描述符的讀就緒,那么什么是文件描述符讀就緒,什么是寫就緒?
讀就緒:就是這個(gè)文件描述符的接收緩沖區(qū)中的數(shù)據(jù)字節(jié)數(shù)大于等于套接字接收緩沖區(qū)低水位標(biāo)記的當(dāng)前大??;
寫就緒:該描述符發(fā)送緩沖區(qū)的可用空間字節(jié)數(shù)大于等于描述符發(fā)送緩沖區(qū)低水位標(biāo)記的當(dāng)前大小。(如果是socket fd,說(shuō)明上一個(gè)數(shù)據(jù)已經(jīng)發(fā)送完成)。
接收低水位標(biāo)記和發(fā)送低水位標(biāo)記:由應(yīng)用程序指定,比如應(yīng)用程序指定接收低水位為64個(gè)字節(jié)。那么接收緩沖區(qū)有64個(gè)字節(jié),才算fd讀就緒;
綜上所述,一個(gè)基本的IO,它會(huì)涉及到兩個(gè)系統(tǒng)對(duì)象,一個(gè)是調(diào)用這個(gè)IO的進(jìn)程對(duì)象,另一個(gè)就是系統(tǒng)內(nèi)核,當(dāng)一個(gè)read操作發(fā)生時(shí),它會(huì)經(jīng)歷如下階段:
- 進(jìn)程對(duì)象通過(guò)read系統(tǒng)調(diào)用向內(nèi)核發(fā)起讀請(qǐng)求
- 內(nèi)核向硬件發(fā)送讀指令,并等待讀就緒。
- 內(nèi)核把將要讀取的數(shù)據(jù)復(fù)制到內(nèi)核緩沖區(qū)
- 數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶進(jìn)程空間中
同步與異步
所謂同步就是發(fā)出一個(gè)調(diào)用后,在沒(méi)有得到結(jié)果之前該調(diào)用就不返回,就是調(diào)用者主動(dòng)等待這個(gè)調(diào)用的結(jié)果。 異步則相反,調(diào)用發(fā)出后,這個(gè)調(diào)用就直接返回了,調(diào)用者不會(huì)立刻得到結(jié)果,而是在調(diào)用發(fā)出后,被調(diào)用者通過(guò)回調(diào)函數(shù)等方式來(lái)告知調(diào)用者。
舉個(gè)通俗的例子:
你打電話問(wèn)書店老板有沒(méi)有《分布式系統(tǒng)》這本書,如果是同步通信機(jī)制,書店老板會(huì)說(shuō),你稍等,”我查一下”,然后開始查啊查,等查好了(可能是5秒,也可能是一天)告訴你結(jié)果(返回結(jié)果)。
而異步通信機(jī)制,書店老板直接告訴你我查一下啊,查好了打電話給你,然后直接掛電話了(不返回結(jié)果)。然后查好了,他會(huì)主動(dòng)打電話給你。在這里老板通過(guò)“回電”這種方式來(lái)回調(diào)。
本質(zhì)上,訪問(wèn)數(shù)據(jù)的方式,同步需要當(dāng)前線程讀寫數(shù)據(jù),在讀寫數(shù)據(jù)的過(guò)程中數(shù)據(jù)可能還沒(méi)ready,可能會(huì)阻塞,而異步io則是操作系統(tǒng)等數(shù)據(jù)ready之后會(huì)通知進(jìn)程數(shù)據(jù)好了,可以直接讀了
阻塞io模型:
在缺省情形下,所有文件操作都是阻塞的,在進(jìn)程空間中調(diào)用recvfrom,其系統(tǒng)調(diào)用直到數(shù)據(jù)報(bào)到達(dá)且被拷貝到應(yīng)用進(jìn)程的緩沖區(qū)中或者發(fā)生錯(cuò)誤才返回,期間一直在等待。我們就說(shuō)進(jìn)程在從調(diào)用recvfrom開始到它返回的整段時(shí)間內(nèi)是被阻塞的。

當(dāng)用戶進(jìn)程調(diào)用了recvfrom這個(gè)系統(tǒng)調(diào)用,kernel就開始了IO的第一個(gè)階段,準(zhǔn)備數(shù)據(jù),對(duì)于network IO, 很多時(shí)候數(shù)據(jù)在一開始還沒(méi)有到達(dá),比如還沒(méi)有收到一個(gè)完整的UDP包,這個(gè)時(shí)候kernel就要等待足夠的數(shù)據(jù)到來(lái)。而用戶進(jìn)程這邊,整個(gè)進(jìn)程會(huì)被阻塞,當(dāng)kernel一直等到數(shù)據(jù)準(zhǔn)備好了,它就會(huì)將數(shù)據(jù)從kernel中拷貝到用戶內(nèi)存,然后kernel返回結(jié)果,用戶進(jìn)程才解除block狀態(tài),重新運(yùn)行起來(lái)。
非阻塞IO模型
進(jìn)程把一個(gè)套接口設(shè)置為非阻塞是在通知內(nèi)核:當(dāng)所請(qǐng)求的IO操作不能滿足要求時(shí),不把本進(jìn)程投入睡眠,而是返回一個(gè)錯(cuò)誤。也就是說(shuō)當(dāng)數(shù)據(jù)沒(méi)有到達(dá)時(shí)并不等待,而是以一個(gè)錯(cuò)誤返回。

從圖中可以看出,當(dāng)用戶進(jìn)程發(fā)出read操作時(shí),如果kernel中的數(shù)據(jù)還沒(méi)有準(zhǔn)備好,它并不會(huì)block用戶進(jìn)程,而是立刻返回一個(gè)error。從用戶進(jìn)程角度講,它發(fā)起一個(gè)read操作后,并不需要等待,而是馬上就得到了一個(gè)結(jié)果。用戶進(jìn)程判斷結(jié)果是一個(gè)error時(shí),它就知道數(shù)據(jù)還沒(méi)有準(zhǔn)備好,于是它可以再次發(fā)送read操作,一旦kernel中的數(shù)據(jù)準(zhǔn)備好了,并且又再次收到了用戶進(jìn)程的system call,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存,然后返回。所以,用戶進(jìn)程其實(shí)是需要不斷的主動(dòng)詢問(wèn)kernel數(shù)據(jù)好了沒(méi)有。
IO復(fù)用模型
linux提供select/poll,進(jìn)程通過(guò)將一個(gè)或多個(gè)fd傳遞給select或poll系統(tǒng)調(diào)用,select/poll會(huì)不斷輪詢所負(fù)責(zé)的所有socket,可以偵測(cè)許多fd是否就緒,但select和poll是順序掃描fd是否就緒,并且支持的fd數(shù)量有限。linux還提供了epoll系統(tǒng)調(diào)用,它是基于事件驅(qū)動(dòng)的方式,而不是順序掃描,當(dāng)某個(gè)socket有數(shù)據(jù)到達(dá)了,可以直接通知用戶進(jìn)程,而不需要順序輪詢掃描,提高了效率。

當(dāng)進(jìn)程調(diào)用了select,整個(gè)進(jìn)程會(huì)被block,同時(shí),kernel會(huì)監(jiān)視所有select負(fù)責(zé)的socket,當(dāng)任何一個(gè)socket的數(shù)據(jù)準(zhǔn)備好了,select就會(huì)返回,這個(gè)圖和阻塞IO的圖其實(shí)并沒(méi)有多大區(qū)別,事實(shí)上,還更差一點(diǎn),因?yàn)檫@里需要使用兩個(gè)System call,select和recvFrom,而blocking io只調(diào)用了一個(gè)system call(recvfrom),但是select的好處在與它可以同時(shí)處理多個(gè)connection,(如果處理的連接數(shù)不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優(yōu)勢(shì)并不是對(duì)于單個(gè)連接能處理得更快,而是在于能處理更多的連接。)
信號(hào)驅(qū)動(dòng)異步IO模型
首先開啟套接口信號(hào)驅(qū)動(dòng)I/O功能, 并通過(guò)系統(tǒng)調(diào)用sigaction安裝一個(gè)信號(hào)處理函數(shù)(此系統(tǒng)調(diào)用立即返回,進(jìn)程繼續(xù)工作,它是非阻塞的)。當(dāng)數(shù)據(jù)報(bào)準(zhǔn)備好被讀時(shí),就為該進(jìn)程生成一個(gè)SIGIO信號(hào)。隨即可以在信號(hào)處理程序中調(diào)用recvfrom來(lái)讀數(shù)據(jù)報(bào),井通知主循環(huán)數(shù)據(jù)已準(zhǔn)備好被處理中。也可以通知主循環(huán),讓它來(lái)讀數(shù)據(jù)報(bào)。

異步I/O模型
告知內(nèi)核啟動(dòng)某個(gè)操作,并讓內(nèi)核在整個(gè)操作完成后(包括將數(shù)據(jù)從內(nèi)核拷貝到用戶自己的緩沖區(qū))通知用戶進(jìn)程,這種模型和信號(hào)驅(qū)動(dòng)模型的主要區(qū)別是:信號(hào)驅(qū)動(dòng)IO:由內(nèi)核通知我們何時(shí)可以啟動(dòng)一個(gè)IO操作,異步IO模型:由內(nèi)核通知我們IO操作何時(shí)完成

用戶進(jìn)程發(fā)起read操作之后,立刻就可以開始去做其他的事了,從kernel的角度,當(dāng)它受到一個(gè)asynchronous read之后,首先它會(huì)立刻返回,不會(huì)對(duì)用戶進(jìn)程產(chǎn)生任何block,然后,kernel會(huì)等待數(shù)據(jù)準(zhǔn)備完成,然后再將數(shù)據(jù)拷貝到用戶進(jìn)程內(nèi)存,當(dāng)著一切都完成之后,kernel會(huì)給用戶進(jìn)程發(fā)送一個(gè)signal,告訴它read操作已經(jīng)完成。
小結(jié) 前面幾種都是同步IO,在內(nèi)核數(shù)據(jù)copy到用戶空間都是阻塞的。最后一種是異步IO,通過(guò)API把IO操作交給操作系統(tǒng)處理,當(dāng)前進(jìn)程不關(guān)心具體IO的實(shí)現(xiàn),通過(guò)回調(diào)函數(shù)或者信號(hào)量通知當(dāng)前進(jìn)程直接對(duì)IO返回結(jié)果進(jìn)行處理。一個(gè)IO操作其實(shí)分成了兩個(gè)步驟,發(fā)起IO請(qǐng)求和實(shí)際的IO操作,同步IO和異步IO的區(qū)別就在于第二步是否阻塞,如果實(shí)際的IO讀寫阻塞請(qǐng)求進(jìn)程,那就是同步IO,因此前四種都是同步IO,如果不阻塞,而是操作系統(tǒng)幫你做完IO操作再將結(jié)果返回給你,那就是異步IO。阻塞IO和非阻塞IO的區(qū)別在于第一步,發(fā)起IO請(qǐng)求是否會(huì)被阻塞,如果阻塞直到完成那么就是傳統(tǒng)的阻塞IO,如果不阻塞,那就是非阻塞IO.
舉個(gè)簡(jiǎn)單例子來(lái)說(shuō)明:
有A,B,C,D四個(gè)人在釣魚:
A用的是最老式的魚竿,所以呢,得一直守著,等到魚上鉤了再拉桿;
B的魚竿有個(gè)功能,能夠顯示是否有魚上鉤,所以呢,B沒(méi)事就睡覺,隔會(huì)再看看有沒(méi)有魚上鉤,有的話就迅速拉桿;
C用的魚竿和B差不多,但他想了一個(gè)好辦法,就是同時(shí)放好幾根魚竿,然后守在旁邊,一旦有顯示說(shuō)魚上鉤了,它就將對(duì)應(yīng)的魚竿拉起來(lái);
D是個(gè)有錢人,干脆雇了一個(gè)人幫他釣魚,一旦那個(gè)人把魚釣上來(lái)了,就給D發(fā)個(gè)短信。
A: 阻塞IO, B: 非阻塞IO: C: NIO D: AIO
AIO BIO NIO
- AIO異步非阻塞IO,AIO方式適用于連接數(shù)目多且連接比較長(zhǎng)的架構(gòu),充分調(diào)用OS參與并發(fā)操作,編程比較復(fù)雜,JDK7開始支持。
- NIO同步非阻塞IO,適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),比如聊天服務(wù)器,并發(fā)局限于應(yīng)用中,編程比較復(fù)雜,JDK1.4開始支持。
- BIO同步阻塞IO,適用于連接數(shù)目比較小且固定的架構(gòu),這種方式對(duì)服務(wù)器資源要求比較高,并發(fā)局限于應(yīng)用中,JDK1.4以前的唯一選擇,但程序直觀簡(jiǎn)單易理解。
Java對(duì)BIO、NIO、AIO的支持:
Java BIO: 同步并阻塞,服務(wù)器實(shí)現(xiàn)模式為一個(gè)連接一個(gè)線程,即客戶端有連接請(qǐng)求時(shí)服務(wù)端就需要啟動(dòng)一個(gè)線程進(jìn)行處理,如果這個(gè)連接不做任何事情會(huì)造成不必要的線程開銷,可以通過(guò)線程池機(jī)制改善
JAVA NIO: 同步非阻塞,服務(wù)器實(shí)現(xiàn)模式為一個(gè)請(qǐng)求一個(gè)線程,即客戶端發(fā)送的連接請(qǐng)求都會(huì)注冊(cè)到多路復(fù)用器上,多路復(fù)用器輪詢到連接有IO請(qǐng)求才啟動(dòng)一個(gè)線程進(jìn)行處理。
Java AIO: 異步非阻塞,服務(wù)器實(shí)現(xiàn)模式為一個(gè)有效請(qǐng)求一個(gè)線程,客戶端的IO請(qǐng)求都是由OS先完成了再通知服務(wù)器應(yīng)用去啟動(dòng)線程進(jìn)行處理。
Selector
Nio中的selector具體是一個(gè)什么樣的東西?想想一個(gè)場(chǎng)景:在一個(gè)養(yǎng)雞場(chǎng),有這么一個(gè)人,每天的工作就是不停檢查幾個(gè)特殊的雞籠,如果有雞進(jìn)來(lái),有雞出去,有雞生蛋,有雞生病等等,就把相應(yīng)的情況記錄下來(lái),如果雞場(chǎng)的負(fù)責(zé)人想知道情況,只需要詢問(wèn)那個(gè)人即可。在這里,這個(gè)人就相當(dāng)Selector,每個(gè)雞籠相當(dāng)于一個(gè)SocketChannel,每個(gè)線程通過(guò)一個(gè)Selector管理多個(gè)SocketChannel,

為了實(shí)現(xiàn)Selector管理多個(gè)SocketChannel,必須將具體得socketChannel對(duì)象注冊(cè)到Selector,并聲明需要監(jiān)聽的事件,一共有四種事件:
1、connect:客戶端連接服務(wù)端事件,對(duì)應(yīng)值為SelectionKey.OPCONNECT(8)
2、accept:服務(wù)端接收客戶端連接事件,對(duì)應(yīng)值為SelectionKey.OPACCEPT(16)
3、read:讀事件,對(duì)應(yīng)值為SelectionKey.OPREAD(1)
4、write:寫事件,對(duì)應(yīng)值為SelectionKey.OPWRITE(4)
當(dāng)SocketChannel有對(duì)應(yīng)的事件發(fā)生時(shí),Selector都可以觀察到,并進(jìn)行處理。
服務(wù)端代碼
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
int n = selector.select();
if (n == 0) continue;
Iterator ite = this.selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = (SelectionKey)ite.next();
if (key.isAcceptable()){
SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
clntChan.configureBlocking(false);
//將選擇器注冊(cè)到連接到的客戶端信道,
//并指定該信道key值的屬性為OP_READ,
//同時(shí)為該信道指定關(guān)聯(lián)的附件
clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
}
if (key.isReadable()){
handleRead(key);
}
if (key.isWritable() && key.isValid()){
handleWrite(key);
}
if (key.isConnectable()){
System.out.println("isConnectable = true");
}
ite.remove();
}
}
服務(wù)端操作過(guò)程:
- 創(chuàng)建ServerSocketChannel實(shí)例,并綁定指定端口
- 創(chuàng)建selector實(shí)例
- 將serverSocketChannel注冊(cè)到selector,并指定事件OP_ACCEPT,最底層的socket通過(guò)channel和selector建立關(guān)聯(lián)
- 如果沒(méi)有準(zhǔn)備好的socket,select方法會(huì)被阻塞一段時(shí)間
- 如果底層有socket已經(jīng)準(zhǔn)備好,selector的select方法會(huì)返回socket的個(gè)數(shù),而且selectedKeys方法會(huì)返回socket對(duì)應(yīng)的事件(connect, accept、read、write);
- 根據(jù)事件類型,進(jìn)行不同的邏輯處理
在步驟3中,selector只注冊(cè)了serverSocketChannel的OP_ACCEPT事件,如果有客戶端A連接服務(wù),執(zhí)行select方法時(shí),可以通過(guò)serverSocketChannel獲取客戶端A的socketChannel,并在selector上注冊(cè)socketChannel的OP_READ事件。如果客戶端A發(fā)送數(shù)據(jù),會(huì)觸發(fā)read事件,下次輪詢調(diào)用select方法時(shí),就能通過(guò)socketChannel讀取數(shù)據(jù),同時(shí)在selector上注冊(cè)該socketChannel的OP_WRITE事件,實(shí)現(xiàn)服務(wù)器往客戶端寫數(shù)據(jù)。
Selector實(shí)現(xiàn)原理
下載openjdk源碼,我們一探究竟,

比較清晰得看到,openjdk中Selector的實(shí)現(xiàn)是SelectorImpl,
然后SelectorImpl又將職責(zé)委托給了具體的平臺(tái),比如圖中框出的linux2.6以后才有的EpollSelectorImpl, Windows平臺(tái)則是WindowsSelectorImpl, MacOSX平臺(tái)是KQueueSelectorImpl. 從名字也可以猜到,openjdk肯定在底層還是用epoll,kqueue,iocp這些技術(shù)來(lái)實(shí)現(xiàn)的I/O多路復(fù)用
獲取selector
眾所周知,Selector.open()可以得到一個(gè)Selector實(shí)例,怎么實(shí)現(xiàn)的呢?
// Selector.java
public static Selector open() throws IOException {
// 首先找到provider,然后再打開Selector
return SelectorProvider.provider().openSelector();
}
// java.nio.channels.spi.SelectorProvider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
// 這里就是打開Selector的真正方法
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
在openjdk中,每個(gè)操作系統(tǒng)都有一個(gè)sun.nio.ch.DefaultSelectorProvider實(shí)現(xiàn),如果系統(tǒng)是Linux的話,真正創(chuàng)建的是sun.nio.ch.EPollSelectorProvider,如果是macos,真正創(chuàng)建的是 KQueueSelectorProvider,我們以linux的EpollSelector為例說(shuō)明:
public class EPollSelectorProvider
extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
}
很直觀,這樣我們?cè)贚inux平臺(tái)就得到了最終的Selector實(shí)現(xiàn):sun.nio.ch.EPollSelectorImpl
EPollSelector如何進(jìn)行select
epoll原理
epoll是Linux下的一種IO多路復(fù)用技術(shù),可以非常高效的處理數(shù)以百萬(wàn)計(jì)的socket句柄。
三個(gè)epoll相關(guān)的系統(tǒng)調(diào)用:
int epoll_create(int size)
epoll_create建立一個(gè)epoll對(duì)象。參數(shù)size是內(nèi)核保證能夠正確處理的最大句柄數(shù),多于這個(gè)最大數(shù)時(shí)內(nèi)核可不保證效果。int epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
epoll_ctl可以操作epoll_create創(chuàng)建的epoll,如將socket句柄加入到epoll中讓其監(jiān)控,或把epoll正在監(jiān)控的某個(gè)socket句柄移出epoll。int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout)
epoll_wait在調(diào)用時(shí),在給定的timeout時(shí)間內(nèi),所監(jiān)控的句柄中有事件發(fā)生時(shí),就返回用戶態(tài)的進(jìn)程。
epoll內(nèi)部實(shí)現(xiàn)大概如下:
- epoll初始化時(shí),會(huì)向內(nèi)核注冊(cè)一個(gè)文件系統(tǒng),用于存儲(chǔ)被監(jiān)控的句柄文件,調(diào)用epoll_create時(shí),會(huì)在這個(gè)文件系統(tǒng)中創(chuàng)建一個(gè)file節(jié)點(diǎn)。同時(shí)epoll會(huì)開辟自己的內(nèi)核高速緩存區(qū),以紅黑樹的結(jié)構(gòu)保存句柄,以支持快速的查找、插入、刪除。還會(huì)再建立一個(gè)list鏈表,用于存儲(chǔ)準(zhǔn)備就緒的事件。
- 當(dāng)執(zhí)行epoll_ctl時(shí),除了把socket句柄放到epoll文件系統(tǒng)里file對(duì)象對(duì)應(yīng)的紅黑樹上之外,還會(huì)給內(nèi)核中斷處理程序注冊(cè)一個(gè)回調(diào)函數(shù),告訴內(nèi)核,如果這個(gè)句柄的中斷到了,就把它放到準(zhǔn)備就緒list鏈表里。所以,當(dāng)一個(gè)socket上有數(shù)據(jù)到了,內(nèi)核在把網(wǎng)卡上的數(shù)據(jù)copy到內(nèi)核中后,就把socket插入到就緒鏈表里。
- 當(dāng)epoll_wait調(diào)用時(shí),僅僅觀察就緒鏈表里有沒(méi)有數(shù)據(jù),如果有數(shù)據(jù)就返回,否則就sleep,超時(shí)時(shí)立刻返回。
Epoll fd的創(chuàng)建
EPollSelectorImpl的構(gòu)造器代碼如下:
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
this.epfd = EPoll.create();
this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);
try {
// makePipe返回管道的2個(gè)文件描述符,編碼在一個(gè)long類型的變量中
// 高32位代表讀 低32位代表寫
// 使用pipe為了實(shí)現(xiàn)Selector的wakeup邏輯
long fds = IOUtil.makePipe(false);
this.fd0 = (int) (fds >>> 32);
this.fd1 = (int) fds;
} catch (IOException ioe) {
EPoll.freePollArray(pollArrayAddress);
FileDispatcherImpl.closeIntFD(epfd);
throw ioe;
}
// register one end of the socket pair for wakeups
EPoll.ctl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
}
static native int create() throws IOException; // epollCreate方法,這是個(gè)native方法。
在Epoll.c中可以看到:
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPoll_create(JNIEnv *env, jclass clazz) {
/* size hint not used in modern kernels */
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
可以看到最后還是使用了操作系統(tǒng)的api: epoll_create函數(shù)
Epoll wait等待內(nèi)核IO事件
調(diào)用Selector.select(),
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
assert Thread.holdsLock(this);
// epoll_wait timeout is int
int to = (int) Math.min(timeout, Integer.MAX_VALUE);
boolean blocking = (to != 0);
boolean timedPoll = (to > 0);
int numEntries;
processUpdateQueue();
processDeregisterQueue();
try {
begin(blocking);
do {
long startTime = timedPoll ? System.nanoTime() : 0;
//
numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
// timed poll interrupted so need to adjust timeout
long adjust = System.nanoTime() - startTime;
to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
if (to <= 0) {
// timeout expired so no retry
numEntries = 0;
}
}
} while (numEntries == IOStatus.INTERRUPTED);
assert IOStatus.check(numEntries);
} finally {
end(blocking);
}
processDeregisterQueue();
return processEvents(numEntries, action);
}
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPoll_wait(JNIEnv *env, jclass clazz, jint epfd,
jlong address, jint numfds, jint timeout)
{
struct epoll_event *events = jlong_to_ptr(address);
// 發(fā)起epoll_wait系統(tǒng)調(diào)用等待內(nèi)核事件
int res = epoll_wait(epfd, events, numfds, timeout);
if (res < 0) {
if (errno == EINTR) {
return IOS_INTERRUPTED;
} else {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
return IOS_THROWN;
}
}
return res;
}
可以看到,最后還是發(fā)起的epoll_wait系統(tǒng)調(diào)用.
epoll control以及openjdk對(duì)事件管理的封裝
JDK中對(duì)于注冊(cè)到Selector上的IO事件關(guān)系是使用SelectionKey來(lái)表示,代表了Channel感興趣的事件,如Read,Write,Connect,Accept.
調(diào)用Selector.register()時(shí)均會(huì)將事件存儲(chǔ)到EpollArrayWrapper的成員變量eventsLow和eventsHigh中
// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
// 使用數(shù)組保存事件變更, 數(shù)組的最大長(zhǎng)度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超過(guò)數(shù)組長(zhǎng)度的事件會(huì)緩存到這個(gè)map中,等待下次處理
private Map<Integer,Byte> eventsHigh;
/**
* Sets the pending update events for the given file descriptor. This
* method has no effect if the update events is already set to KILLED,
* unless {@code force} is {@code true}.
*/
private void setUpdateEvents(int fd, byte events, boolean force) {
// 判斷fd和數(shù)組長(zhǎng)度
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
在EpollArrayWrapper.poll()的時(shí)候, 首先會(huì)調(diào)用updateRegistrations,
/**
* Returns the pending update events for the given file descriptor.
*/
private byte getUpdateEvents(int fd) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
return eventsLow[fd];
} else {
Byte result = eventsHigh.get(Integer.valueOf(fd));
// result should never be null
return result.byteValue();
}
}
/**
* Update the pending registrations.
*/
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
int fd = updateDescriptors[j];
// 從保存的eventsLow和eventsHigh里取出事件
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
// 判斷操作類型以傳給epoll_ctl
// 沒(méi)有指定EPOLLET事件類型
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 熟悉的epoll_ctl
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}
private native void epollCtl(int epfd, int opcode, int fd, int events);
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
jint opcode, jint fd, jint events)
{
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
// 發(fā)起epoll_ctl調(diào)用來(lái)進(jìn)行IO事件的管理
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
/*
* A channel may be registered with several Selectors. When each Selector
* is polled a EPOLL_CTL_DEL op will be inserted into its pending update
* list to remove the file descriptor from epoll. The "last" Selector will
* close the file descriptor which automatically unregisters it from each
* epoll descriptor. To avoid costly synchronization between Selectors we
* allow pending updates to be processed, ignoring errors. The errors are
* harmless as the last update for the file descriptor is guaranteed to
* be EPOLL_CTL_DEL.
*/
if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
}
簡(jiǎn)而言之,register最后還是使用的epoll_ctl
有個(gè)小細(xì)節(jié)是jdk沒(méi)有指定ET(邊緣觸發(fā))還是LT(水平觸發(fā)),所以默認(rèn)會(huì)用LT.
水平觸發(fā)(level-trggered)
只要文件描述符關(guān)聯(lián)的讀內(nèi)核緩沖區(qū)非空,有數(shù)據(jù)可以讀取,就一直發(fā)出可讀信號(hào)進(jìn)行通知,
當(dāng)文件描述符關(guān)聯(lián)的內(nèi)核寫緩沖區(qū)不滿,有空間可以寫入,就一直發(fā)出可寫信號(hào)進(jìn)行通知
LT模式支持阻塞和非阻塞兩種方式。epoll默認(rèn)的模式是LT。
邊緣觸發(fā)(edge-triggered)
當(dāng)文件描述符關(guān)聯(lián)的讀內(nèi)核緩沖區(qū)由空轉(zhuǎn)化為非空的時(shí)候,則發(fā)出可讀信號(hào)進(jìn)行通知,
當(dāng)文件描述符關(guān)聯(lián)的內(nèi)核寫緩沖區(qū)由滿轉(zhuǎn)化為不滿的時(shí)候,則發(fā)出可寫信號(hào)進(jìn)行通知
兩者的區(qū)別在哪里呢?水平觸發(fā)是只要讀緩沖區(qū)有數(shù)據(jù),就會(huì)一直觸發(fā)可讀信號(hào),而邊緣觸發(fā)僅僅在空變?yōu)榉强盏臅r(shí)候通知一次.
LT(level triggered)是缺省的工作方式,并且同時(shí)支持block和no-block socket.在這種做法中,內(nèi)核告訴你一個(gè)文件描述符是否就緒了,然后你可以對(duì)這個(gè)就緒的fd進(jìn)行IO操作。如果你不作任何操作,內(nèi)核還是會(huì)繼續(xù)通知你的,所以,這種模式編程出錯(cuò)誤可能性要小一點(diǎn)。傳統(tǒng)的select/poll都是這種模型的代表.
水平觸發(fā)和邊緣觸發(fā)模式區(qū)別
讀緩沖區(qū)剛開始是空的
讀緩沖區(qū)寫入2KB數(shù)據(jù)
水平觸發(fā)和邊緣觸發(fā)模式此時(shí)都會(huì)發(fā)出可讀信號(hào)
收到信號(hào)通知后,讀取了1kb的數(shù)據(jù),讀緩沖區(qū)還剩余1KB數(shù)據(jù)
水平觸發(fā)會(huì)再次進(jìn)行通知,而邊緣觸發(fā)不會(huì)再進(jìn)行通知
所以,邊緣觸發(fā)需要一次性的把緩沖區(qū)的數(shù)據(jù)讀完為止,也就是一直讀,直到讀到EGAIN為止,EGAIN說(shuō)明緩沖區(qū)已經(jīng)空了,因?yàn)檫@一點(diǎn),邊緣觸發(fā)需要設(shè)置文件句柄為非阻塞
//水平觸發(fā)
ret = read(fd, buf, sizeof(buf));
//邊緣觸發(fā)
while(true) {
ret = read(fd, buf, sizeof(buf);
if (ret == EAGAIN) break;
}
在AbstractSelectorImpl中有3個(gè)set保存事件
// Public views of the key sets
// 注冊(cè)的所有事件
private Set<SelectionKey> publicKeys; // Immutable
// 內(nèi)核返回的IO事件封裝,表示哪些fd有數(shù)據(jù)可讀可寫
private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
// 取消的事件
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
在EpollArrayWrapper.poll調(diào)用完成之后, 會(huì)調(diào)用updateSelectedKeys來(lái)更新上面的三set
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}

jdk中Selector是對(duì)操作系統(tǒng)的IO多路復(fù)用調(diào)用的一個(gè)封裝,在Linux中就是對(duì)epoll的封裝。epoll實(shí)質(zhì)上是將event loop交給了內(nèi)核,因?yàn)榫W(wǎng)絡(luò)數(shù)據(jù)都是首先到內(nèi)核的,直接內(nèi)核處理可以避免無(wú)謂的系統(tǒng)調(diào)用和數(shù)據(jù)拷貝, 性能是最好的。jdk中對(duì)IO事件的封裝是SelectionKey, 保存Channel關(guān)心的事件。
簡(jiǎn)單說(shuō),nio是依賴操作系統(tǒng)的實(shí)現(xiàn),java并不能一個(gè)線程同時(shí)監(jiān)聽多個(gè)socket,
,在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型實(shí)現(xiàn),是基于IO復(fù)用技術(shù)的非阻塞IO。在JDK1.5 update10和linux core2.6以上版本,sun優(yōu)化了Selctor的實(shí)現(xiàn),底層使用epoll替換了select/poll。
彩蛋
