NIO的網(wǎng)絡(luò)事件
????????Unsafe是內(nèi)部接口,聚合在Channel中協(xié)助進(jìn)行網(wǎng)絡(luò)讀寫相關(guān)的操作,Channel的內(nèi)部輔助類,不應(yīng)該被Netty的上層使用者調(diào)用,所以被命名為Unsafe。
????????NIO網(wǎng)絡(luò)事件主要有OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE四種,服務(wù)端ServerSocketChannel只支持OP_ACCEPT操作,服務(wù)端的SocketChannel只支持OP_READ、OP_WRITE操作,客戶端的SocketChannel支持OP_CONNECT、OP_WRITE、OP_READ。
????????OP_ACCEPT:收到一個客戶端的連接請求時就緒
????????OP_CONNECT:只有客戶端SocketChannel會注冊該操作
????????OP_READ:當(dāng)OS的讀緩沖區(qū)有數(shù)據(jù)可讀時,搞作就緒
????????OP_WRITE:當(dāng)OS的寫緩沖區(qū)中有空間的空間時,操作就緒
Unsafe處理I/O事件
????????在NioEventLoop介紹中,說到NioEventLoop線程啟動后,run方法中執(zhí)行了一個死循環(huán)用于發(fā)現(xiàn)I/O事件。發(fā)現(xiàn)事件后會調(diào)用NioEventLoop#processSelectedKeys()處理I/O事件:

????????根據(jù)selectedKeys有沒有值分為processSelectedKeysOptimized()方法和processSelectedKeysPlain(selector.selectedKeys())方法,實(shí)現(xiàn)邏輯差不多,本例中是有值的,進(jìn)入processSelectedKeysOptimized()方法如下:
????????因?yàn)榇藭r的Channel是NioServerSocketChannel,是AbstractNioChannel的實(shí)現(xiàn)類,所以進(jìn)入
processSelectedKey(k, (AbstractNioChannel) a),代碼如下:
????????在此方法中就看到了對四種I/O時間的處理。
OP_ACCEPT事件處理
??????????當(dāng)客戶端發(fā)起connect的時候,服務(wù)端會收到OP_ACCEPT事件,此時,服務(wù)端調(diào)用accept方法獲得一個SocketChannel,然后將其封裝成一個NioSocketChannel注冊到workGroup上。
??????????由上述代碼可以看到OP_READ與OP_ACCEPT都是進(jìn)的同一個if,都是調(diào)用的unsafe.read(),但是二者的unsafe并不是用一個對象,即read有不同的實(shí)現(xiàn),處理OP_ACCEPT時間的unsafe為NioMessageUnsafe對象,進(jìn)入其read方法代碼如下:
??????????進(jìn)入處理讀取數(shù)據(jù)的操作doReadMessages()方法,代碼如下:
??????????調(diào)用ServerSocketChannel.accept()獲取一個SocketChannel并將其封裝成一個NioSocketChannel放到readBuf中,NioSocketChannel的初始化與NioServerSocketChannel類似,但是初始化的unsafe不再是NioMessageUnsafe對象而是NioSocketChannelUnsafe(繼承自NioByteUnsafe,NioByteUnsafe是channelI/O操作具體實(shí)現(xiàn)類)對象,初始化時會將當(dāng)前的NioServerSocketChannel傳入,賦值到NioSockerChannel的parent屬性中,將SelectionKey.OP_READ賦值給成員變量readInterestOp等。
??????????將存NioSocketChannel的readBuf作為參數(shù)傳入pipeline.fireChannelRead()方法,將NioSocketChannel作為ChannelRead事件的傳播數(shù)據(jù)進(jìn)行傳播。
??????????在NioServerSocketChannel啟動注冊流程中說到NioServerSocketChannel在init的時候新建了一個任務(wù)將ServerBootstrapAcceptor加入到了所關(guān)聯(lián)的pipeline中,此處的ChannelRead事件傳播會調(diào)用ServerBootstrapAcceptor的channelRead方法,代碼如下:
??????????可以看到設(shè)置NioSocketChannel參數(shù)后,將其異步注冊到childGroup,并注冊一個監(jiān)聽器,在注冊完成時回調(diào),主要用于注冊失敗時關(guān)閉此NioSocketChannel。其注冊流程與NioServerSocketChannel大致相同,不再贅述。注冊過程中完成將NioSocketChannel注冊到childGroup中的一個NioEventLoop,將SocketChannel注冊到NioEventLoop的Selector上,將SelectionKey.OP_READ設(shè)為感興趣事件等工作。此時Selector就開始監(jiān)聽這個SocketChannel的讀事件。
OP_CONNECT事件處理
??????????OP_CONNECT事件是客戶端的事件,只執(zhí)行一次,客戶端在連接的時候如果沒能獲得連接成功的結(jié)果,就會將OP_CONNECT設(shè)為感興趣時間,輪詢等待事件觸發(fā),代碼在NioSocketChannel#doConnect方法中:
??????????SelectionKey.OP_CONNECT準(zhǔn)備就緒后,將SelectionKey.OP_CONNECT事件從SelectionKey所感興趣的事件中移除,然后調(diào)用unsafe.finishConnect()方法:
??????????此時isActive()返回為false,然后在doFinishConnect()中會調(diào)用javaChannel().finishConnect()來標(biāo)識連接完成,然后執(zhí)行fulfillConnectPromise(connectPromise,wasActive)方法如下:
??????????再次調(diào)用isActive(),此時因?yàn)橐呀?jīng)調(diào)用過javaChannel().finishConnect()所以返回true,然后調(diào)用promise.trySuccess()方法看是否有用戶異步取消了連接。然后不管連接是否被取消都觸發(fā)pipeline的fireChannelActive()傳播事件,該事件傳播會調(diào)用到AbstractNioChannel#doBeginRead()方法,將SelectionKey.OP_READ標(biāo)記為感興趣事件。
??????????此時Selector會監(jiān)聽NioSocketChannel上是否有可讀數(shù)據(jù)。
OP_READ事件處理
??????????當(dāng)有數(shù)據(jù)可以讀取時,OP_READ事件被觸發(fā),上文中描述過NioSocketChannel將SelectorKey.OP_READ標(biāo)記為感興趣事件,所以此時觸發(fā)的讀事件調(diào)用的unsafe.read()是NioByteUnsafe#read(),代碼如下:
??????????首先獲取當(dāng)前的pipeline。然后獲取一個ByteBufAllocater此處默認(rèn)為PooledByteBufAllocator實(shí)現(xiàn),即基于內(nèi)存池的堆外內(nèi)存實(shí)現(xiàn)(實(shí)現(xiàn)原理參考內(nèi)存池源碼分析),然后獲取一個AdaptiveRecvByteBufAllocator對象用于計(jì)算一個最優(yōu)緩沖區(qū)大小用于創(chuàng)建緩沖區(qū)。
??????????開啟一個循環(huán),先從內(nèi)存池分配一個byteBuf,然后從SocketChannel中將內(nèi)容寫到byteBuf中去,從byteBuf的writerIndex開始寫入數(shù)據(jù),writerIndex會增加所寫入的字節(jié)數(shù)。并且設(shè)置了最大可從SocketChannel讀取的數(shù)據(jù)大小為allocHandle.attemptedBytesRead(),這也是byteBuf的容量大小。
??????????然后判斷讀到的字節(jié)數(shù),如果小于等于0,說明沒讀到數(shù)據(jù)釋放byteBuf,退出循環(huán)。如果讀到了數(shù)據(jù)累加讀消息次數(shù),調(diào)用pipeline.fireChannelRead()事件,在ChannelPipeline中傳播,此時可以看到,觸發(fā)fireChannelRead()事件是在每一次循環(huán)都會觸發(fā)一次,所以需要對收到的數(shù)據(jù)舉行編解碼來分割數(shù)據(jù)包。然后判斷是否需要繼續(xù)循環(huán),判斷依據(jù)是已讀字節(jié)數(shù)、已讀操作數(shù)等諸多變量,不再展開描述。
??????????循環(huán)結(jié)束后調(diào)用一次allocHandle.readComplete()來記錄本次讀循環(huán)的數(shù)據(jù)信息以用于預(yù)測下一次讀事件觸發(fā)時,應(yīng)該分配多大的ByteBuf容量更加合理些。然后觸發(fā)pipeline.fireChannelReadComplete()事件,并在PipelineChannel中傳播,此事件在循環(huán)外觸發(fā),因此每次OP_READ只會傳播一次ChannelReadComplete事件。然后判斷連接是否已關(guān)閉,是則執(zhí)行closeOnRead(pipeline)。
??????????在finally中,如果本次操作讀到有效數(shù)據(jù)且用戶配置設(shè)置為非自動讀取,會將OP_READ從感興趣事件中移除,希望讀取數(shù)據(jù)自行添加。
OP_WRITE事件處理
??????????一般不會注冊O(shè)P_WRITE事件,只有當(dāng)寫緩沖區(qū)已滿導(dǎo)致數(shù)據(jù)無法全部寫入時注冊O(shè)P_WRITE事件,當(dāng)寫緩沖區(qū)出現(xiàn)空閑時間觸發(fā)改時間,將未完成的寫數(shù)據(jù)操作完成,寫完后要將此事件注銷。